[openrtm-commit:03344] r1032 - trunk/OpenRTM-aist-Python/OpenRTM_aist

openrtm @ openrtm.org openrtm @ openrtm.org
2018年 10月 10日 (水) 15:03:32 JST


Author: miyamoto
Date: 2018-10-10 15:03:32 +0900 (Wed, 10 Oct 2018)
New Revision: 1032

Modified:
   trunk/OpenRTM-aist-Python/OpenRTM_aist/InPort.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortConnector.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortCorbaCdrProvider.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortPushConnector.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortConnector.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortCorbaCdrProvider.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortDSProvider.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortPullConnector.py
   trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py
Log:
[incompat] Implementation of synchronous data port

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPort.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPort.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPort.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -229,7 +229,7 @@
   # @endif
   #
   # bool isEmpty()
-  def isEmpty(self):
+  def isEmpty(self, names=None):
     self._rtcout.RTC_TRACE("isEmpty()")
     if self._directNewData == True:
       return False
@@ -237,10 +237,33 @@
       self._rtcout.RTC_DEBUG("no connectors")
       return True
 
-    r = self._connectors[0].getBuffer().readable()
-    if r == 0:
-      self._rtcout.RTC_DEBUG("isEmpty() = true, buffer is empty")
-      return True
+    if names is None:
+      r = self._connectors[0].getBuffer().readable()
+      if r == 0:
+        self._rtcout.RTC_DEBUG("isEmpty() = true, buffer is empty")
+        return True
+    elif isinstance(names, str):
+      for con in self._connectors:
+        if  con.name() == names:
+          r = con.getBuffer().readable()
+          if r == 0:
+            self._rtcout.RTC_DEBUG("isEmpty() = True, connector name: %s, buffer is empty",(names))
+            return True
+          else:
+            self._rtcout.RTC_DEBUG("isEmpty() = False, connector name: %s, readable data: %d",(names,r))
+            return False
+    elif isinstance(names, list):
+      del names[:]
+      for con in self._connectors:
+          r = con.getBuffer().readable()
+          if r == 0:
+            self._rtcout.RTC_DEBUG("isEmpty() = True, connector name: %s",(names))
+            names.append(con.name())
+      if len(names) > 0:
+        return True
+    else:
+        self._rtcout.RTC_DEBUG("isEmpty() = False, no readable data")
+        return False
       
     self._rtcout.RTC_DEBUG("isEmpty() = false, data exists in the buffer")
     return False

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortConnector.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortConnector.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortConnector.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -211,3 +211,9 @@
   # void setDataTyep(DataType data);
   def setDataType(self, data):
     self._dataType = data
+
+
+  def write(self, data):
+    pass
+  def read(self, data):
+    pass
\ No newline at end of file

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortCorbaCdrProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortCorbaCdrProvider.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortCorbaCdrProvider.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -171,7 +171,7 @@
     try:
       self._rtcout.RTC_PARANOID("InPortCorbaCdrProvider.put()")
             
-      if not self._buffer:
+      if not self._connector:
         self.onReceiverError(data)
         return OpenRTM.PORT_ERROR
 

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortPushConnector.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortPushConnector.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/InPortPushConnector.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -22,6 +22,7 @@
 from omniORB import any
 
 import OpenRTM_aist
+import threading
 
 
 ##
@@ -141,8 +142,21 @@
     self._provider.init(info.properties)
     self._provider.setBuffer(self._buffer)
     self._provider.setListener(info, self._listeners)
+    self.onConnect()
     
-    self.onConnect()
+
+    self._sync_readwrite = False
+    if OpenRTM_aist.toBool(info.properties.getProperty("sync_readwrite"),"YES","NO",False):
+      self._sync_readwrite = True
+      
+
+    
+    
+    
+
+    self._writecompleted_worker = InPortPushConnector.WorkerThreadCtrl()
+    self._readcompleted_worker = InPortPushConnector.WorkerThreadCtrl()
+    self._readready_worker = InPortPushConnector.WorkerThreadCtrl()
     return
 
     
@@ -212,8 +226,30 @@
       return self.PRECONDITION_NOT_MET
 
     cdr = [""]
+
+    if self._sync_readwrite:
+      self._readcompleted_worker._completed = False
+      
+      self._readready_worker._completed = True
+      self._readready_worker._cond.acquire()
+      self._readready_worker._cond.notify()
+      self._readready_worker._cond.release()
+
+      self._writecompleted_worker._cond.acquire()
+      while not self._writecompleted_worker._completed:
+        self._writecompleted_worker._cond.wait()
+      self._writecompleted_worker._cond.release()
+
     ret = self._buffer.read(cdr)
 
+    if self._sync_readwrite:
+      self._readcompleted_worker._completed = True
+      self._readcompleted_worker._cond.acquire()
+      self._readcompleted_worker._cond.notify()
+      self._readcompleted_worker._cond.release()
+      
+      self._readready_worker._completed = False
+
     if not self._dataType:
       return self.PRECONDITION_NOT_MET
     if self._endian is not None:
@@ -232,11 +268,11 @@
       return self.PORT_OK
 
     elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
-      self.onBufferEmpty()
+      self.onBufferEmpty(cdr[0])
       return self.BUFFER_EMPTY
 
     elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
-      self.onBufferReadTimeout()
+      self.onBufferReadTimeout(cdr[0])
       return self.BUFFER_TIMEOUT
 
     elif ret == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
@@ -381,7 +417,28 @@
   #
   # ReturnCode write(const OpenRTM::CdrData& data);
   def write(self, data):
-    return self._buffer.write(data)
+    if self._sync_readwrite:
+      self._readready_worker._cond.acquire()
+      while not self._readready_worker._completed:
+        self._readready_worker._cond.wait()
+      self._readready_worker._cond.release()
+
+    ret = self._buffer.write(data)
+
+    if self._sync_readwrite:
+      self._writecompleted_worker._completed = True
+      self._writecompleted_worker._cond.acquire()
+      self._writecompleted_worker._cond.notify()
+      self._writecompleted_worker._cond.release()
+
+      self._readcompleted_worker._cond.acquire()
+      while not self._readcompleted_worker._completed:
+        self._readcompleted_worker._cond.wait()
+      self._readcompleted_worker._cond.release()
+      
+      self._writecompleted_worker._completed = False
+    
+    return ret
         
     
   ##
@@ -414,11 +471,17 @@
     if self._listeners and self._profile:
       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
     return
-  def onBufferEmpty(self):
+  def onBufferEmpty(self, data):
     if self._listeners and self._profile:
       self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY].notify(self._profile)
     return
-  def onBufferReadTimeout(self):
+  def onBufferReadTimeout(self, data):
     if self._listeners and self._profile:
       self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_READ_TIMEOUT].notify(self._profile)
     return
+
+  class WorkerThreadCtrl:
+    def __init__(self):
+      self._mutex = threading.RLock()
+      self._cond = threading.Condition(self._mutex)
+      self._completed = False  
\ No newline at end of file

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortConnector.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortConnector.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortConnector.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -184,3 +184,9 @@
   # const char* name();
   def directMode(self):
     return self._directMode
+
+
+  def write(self, data):
+    pass
+  def read(self, data):
+    pass
\ No newline at end of file

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortCorbaCdrProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortCorbaCdrProvider.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortCorbaCdrProvider.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -283,17 +283,13 @@
   # virtual ::OpenRTM::PortStatus get(::OpenRTM::CdrData_out data);
   def get(self):
     self._rtcout.RTC_PARANOID("OutPortCorbaCdrProvider.get()")
-    if not self._buffer:
+    if not self._connector:
       self.onSenderError()
       return (OpenRTM.UNKNOWN_ERROR, "")
 
     try:
-      if self._buffer.empty():
-        self._rtcout.RTC_ERROR("buffer is empty.")
-        return (OpenRTM.BUFFER_EMPTY, "")
-
       cdr = [None]
-      ret = self._buffer.read(cdr)
+      ret = self._connector.read(cdr)
 
       if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
         if not cdr[0]:

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortDSProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortDSProvider.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortDSProvider.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -286,17 +286,14 @@
   # virtual ::OpenRTM::PortStatus pull(::RTC::CdrData_out data);
   def pull(self):
     self._rtcout.RTC_PARANOID("OutPortDSProvider.pull()")
-    if not self._buffer:
+    if not self._connector:
       self.onSenderError()
       return (RTC.UNKNOWN_ERROR, "")
 
     try:
-      if self._buffer.empty():
-        self._rtcout.RTC_ERROR("buffer is empty.")
-        return (RTC.BUFFER_EMPTY, "")
 
       cdr = [None]
-      ret = self._buffer.read(cdr)
+      ret = self._connector.read(cdr)
 
       if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
         if not cdr[0]:

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortPullConnector.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortPullConnector.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortPullConnector.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -145,6 +145,15 @@
     self._provider.setConnector(self)
     self._provider.setListener(info, self._listeners)
     self.onConnect()
+
+    self._sync_readwrite = False
+    if OpenRTM_aist.toBool(info.properties.getProperty("sync_readwrite"),"YES","NO",False):
+      self._sync_readwrite = True
+
+    self._writecompleted_worker = OutPortPullConnector.WorkerThreadCtrl()
+    self._readcompleted_worker = OutPortPullConnector.WorkerThreadCtrl()
+    self._readready_worker = OutPortPullConnector.WorkerThreadCtrl()
+    
     return
 
 
@@ -195,12 +204,66 @@
       self._rtcout.RTC_ERROR("write(): endian %s is not support.",self._endian)
       return self.UNKNOWN_ERROR
     if self._buffer:
+      if self._sync_readwrite:
+        self._readready_worker._cond.acquire()
+        while not self._readready_worker._completed:
+          self._readready_worker._cond.wait()
+        self._readready_worker._cond.release()
+        
       self._buffer.write(cdr_data)
+      
+      if self._sync_readwrite:
+        self._writecompleted_worker._completed = True
+        self._writecompleted_worker._cond.acquire()
+        self._writecompleted_worker._cond.notify()
+        self._writecompleted_worker._cond.release()
+
+        self._readcompleted_worker._cond.acquire()
+        while not self._readcompleted_worker._completed:
+          self._readcompleted_worker._cond.wait()
+        self._readcompleted_worker._cond.release()
+      
+        self._writecompleted_worker._completed = False      
     else:
       return self.UNKNOWN_ERROR
     return self.PORT_OK
 
+  def read(self, data):
 
+    if self._sync_readwrite:
+      self._readcompleted_worker._completed = False
+      
+      self._readready_worker._completed = True
+      self._readready_worker._cond.acquire()
+      self._readready_worker._cond.notify()
+      self._readready_worker._cond.release()
+
+      self._writecompleted_worker._cond.acquire()
+      while not self._writecompleted_worker._completed:
+        self._writecompleted_worker._cond.wait()
+      self._writecompleted_worker._cond.release()
+
+
+    if self._buffer.empty():
+      self._rtcout.RTC_ERROR("buffer is empty.")
+      data[0] = ""
+      return OpenRTM_aist.BufferStatus.BUFFER_EMPTY
+      
+      
+    ret = self._buffer.read(data)
+
+    if self._sync_readwrite:
+      self._readcompleted_worker._completed = True
+      self._readcompleted_worker._cond.acquire()
+      self._readcompleted_worker._cond.notify()
+      self._readcompleted_worker._cond.release()
+      
+      self._readready_worker._completed = False
+
+
+    return ret
+    
+
   ##
   # @if jp
   # @brief Àܳ²ò½ü
@@ -340,5 +403,9 @@
   def setDirectMode(self):
     self._directMode = True
   
+  class WorkerThreadCtrl:
+    def __init__(self):
+      self._mutex = threading.RLock()
+      self._cond = threading.Condition(self._mutex)
+      self._completed = False  
 
-

Modified: trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py
===================================================================
--- trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py	2018-10-09 00:04:48 UTC (rev 1031)
+++ trunk/OpenRTM-aist-Python/OpenRTM_aist/OutPortSHMProvider.py	2018-10-10 06:03:32 UTC (rev 1032)
@@ -166,17 +166,14 @@
   def get(self):
     self._rtcout.RTC_PARANOID("OutPortSHMProvider.get()")
     
-    if not self._buffer:
+    if not self._connector:
       self.onSenderError()
       return OpenRTM.UNKNOWN_ERROR
 
     try:
-      if self._buffer.empty():
-        self._rtcout.RTC_ERROR("buffer is empty.")
-        return OpenRTM.BUFFER_EMPTY
 
       cdr = [None]
-      ret = self._buffer.read(cdr)
+      ret = self._connector.read(cdr)
       
       if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
         if cdr[0] is None:



openrtm-commit メーリングリストの案内