[openrtm-commit:03338] r3417 - trunk/OpenRTM-aist/src/lib/rtm

openrtm @ openrtm.org openrtm @ openrtm.org
2018年 10月 9日 (火) 08:58:58 JST


Author: miyamoto
Date: 2018-10-09 08:58:57 +0900 (Tue, 09 Oct 2018)
New Revision: 3417

Modified:
   trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.cpp
   trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.h
   trunk/OpenRTM-aist/src/lib/rtm/InPortCorbaCdrProvider.cpp
   trunk/OpenRTM-aist/src/lib/rtm/InPortDSProvider.cpp
   trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.cpp
   trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.h
   trunk/OpenRTM-aist/src/lib/rtm/InPortSHMProvider.cpp
   trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.cpp
   trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.h
   trunk/OpenRTM-aist/src/lib/rtm/OutPortCorbaCdrProvider.cpp
   trunk/OpenRTM-aist/src/lib/rtm/OutPortDSProvider.cpp
   trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.cpp
   trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.h
   trunk/OpenRTM-aist/src/lib/rtm/OutPortSHMProvider.cpp
Log:
[incompat] Implementation of synchronous data port

Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -168,4 +168,9 @@
 	  }
   };
 
+  BufferStatus::Enum InPortConnector::write(cdrMemoryStream &cdr)
+  {
+      return BufferStatus::BUFFER_OK;
+  };
+
 }; // namespace RTC

Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.h
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.h	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortConnector.h	2018-10-08 23:58:57 UTC (rev 3417)
@@ -236,7 +236,9 @@
      */
     virtual bool isLittleEndian();
 
+    virtual BufferStatus::Enum write(cdrMemoryStream &cdr);
 
+
 	/*!
 	* @if jp
 	* @brief データをダイレクトに書き込むためのOutPortのサーバントを設定する

Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortCorbaCdrProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortCorbaCdrProvider.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortCorbaCdrProvider.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -33,7 +33,7 @@
    * @endif
    */
   InPortCorbaCdrProvider::InPortCorbaCdrProvider(void)
-  : m_buffer(0)
+      : m_buffer(0), m_connector(NULL)
   {
     // PortProfile setting
     setInterfaceType("corba_cdr");
@@ -155,7 +155,7 @@
   {
     RTC_PARANOID(("InPortCorbaCdrProvider::put()"));
 
-    if (m_buffer == 0)
+    if (m_connector == NULL)
       {
         cdrMemoryStream cdr;
 #ifdef ORB_IS_ORBEXPRESS
@@ -193,7 +193,7 @@
 
 
     onReceived(cdr);
-    BufferStatus::Enum ret = m_buffer->write(cdr);
+    BufferStatus::Enum ret = m_connector->write(cdr);
 
     return convertReturn(ret, cdr);
   }

Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortDSProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortDSProvider.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortDSProvider.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -31,7 +31,7 @@
    * @endif
    */
   InPortDSProvider::InPortDSProvider(void)
-  : m_buffer(0)
+      : m_buffer(0), m_connector(0)
   {
     // PortProfile setting
     setInterfaceType("data_service");
@@ -153,7 +153,7 @@
   {
     RTC_PARANOID(("InPortDSProvider::push()"));
 
-    if (m_buffer == 0)
+    if (m_connector == 0)
       {
         cdrMemoryStream cdr;
 #ifdef ORB_IS_ORBEXPRESS
@@ -191,7 +191,7 @@
 
 
     onReceived(cdr);
-    BufferStatus::Enum ret = m_buffer->write(cdr);
+    BufferStatus::Enum ret = m_connector->write(cdr);
 
     return convertReturn(ret, cdr);
   }

Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -39,7 +39,8 @@
     : InPortConnector(info, listeners, buffer),
       m_provider(provider),
       m_listeners(listeners),
-      m_deleteBuffer(buffer == 0 ? true : false)
+      m_deleteBuffer(buffer == 0 ? true : false),
+      m_sync_readwrite(false)
   {
     // publisher/buffer creation. This may throw std::bad_alloc;
     if (m_buffer == 0)
@@ -53,6 +54,11 @@
     m_provider->setBuffer(m_buffer);
     m_provider->setListener(info, &m_listeners);
 
+    if (coil::toBool(info.properties["sync_readwrite"], "YES", "NO", false))
+    {
+        m_sync_readwrite = true;
+    }
+
     onConnect();
   }
 
@@ -91,7 +97,44 @@
       {
         return PRECONDITION_NOT_MET;
       }
+    if (m_sync_readwrite)
+    {
+    
+        {
+            Guard guard(m_readcompleted_worker.mutex_);
+            m_readcompleted_worker.completed_ = false;
+        }
+
+        {
+            Guard guard(m_readready_worker.mutex_);
+            m_readready_worker.completed_ = true;
+            m_readready_worker.cond_.signal();
+        }
+        {
+            Guard guard(m_writecompleted_worker.mutex_);
+            while (!m_writecompleted_worker.completed_)
+            {
+                m_writecompleted_worker.cond_.wait();
+            }
+        }
+    }
+
     BufferStatus::Enum ret = m_buffer->read(data);
+
+    if (m_sync_readwrite)
+    {
+        {
+            Guard guard(m_readcompleted_worker.mutex_);
+            m_readcompleted_worker.completed_ = true;
+            m_readcompleted_worker.cond_.signal();
+        }
+
+        {
+            Guard guard(m_readready_worker.mutex_);
+            m_readready_worker.completed_ = false;
+        }
+    }
+
     switch (ret)
       {
       case BufferStatus::BUFFER_OK:
@@ -182,5 +225,46 @@
     m_listeners.connector_[ON_DISCONNECT].notify(m_profile);
   }
 
+  BufferStatus::Enum InPortPushConnector::write(cdrMemoryStream &cdr)
+  {
+      if (m_sync_readwrite)
+      {
+          {
+              Guard guard(m_readready_worker.mutex_);
+              while (!m_readready_worker.completed_)
+              {
+                  m_readready_worker.cond_.wait();
+              }
+          }
+      }
+
+      BufferStatus::Enum ret = m_buffer->write(cdr);
+
+      if (m_sync_readwrite)
+      {
+          {
+              Guard guard(m_writecompleted_worker.mutex_);
+              m_writecompleted_worker.completed_ = true;
+              m_writecompleted_worker.cond_.signal();
+          }
+
+
+          {
+              Guard guard(m_readcompleted_worker.mutex_);
+              while (!m_readcompleted_worker.completed_)
+              {
+                  m_readcompleted_worker.cond_.wait();
+              }
+          }
+          {
+              Guard guard(m_writecompleted_worker.mutex_);
+              m_writecompleted_worker.completed_ = false;
+          }
+      }
+
+
+      return ret;
+  };
+
 };  // namespace RTC
 

Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.h
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.h	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortPushConnector.h	2018-10-08 23:58:57 UTC (rev 3417)
@@ -80,6 +80,7 @@
   class InPortPushConnector
     : public InPortConnector
   {
+    typedef coil::Guard<coil::Mutex> Guard;
   public:
     DATAPORTSTATUS_ENUM
 
@@ -254,6 +255,8 @@
      */
     virtual CdrBufferBase* createBuffer(ConnectorInfo& info);
 
+    virtual BufferStatus::Enum write(cdrMemoryStream &cdr);
+
     /*!
      * @if jp
      * @brief 接続確立時にコールバックを呼ぶ
@@ -310,6 +313,20 @@
     ConnectorListeners& m_listeners;
 
     bool m_deleteBuffer;
+
+    bool m_sync_readwrite;
+
+    struct WorkerThreadCtrl
+    {
+        WorkerThreadCtrl() : cond_(mutex_), completed_(false) {}
+        coil::Mutex mutex_;
+        coil::Condition<coil::Mutex> cond_;
+        bool completed_;
+    };
+    WorkerThreadCtrl m_writecompleted_worker;
+    WorkerThreadCtrl m_readcompleted_worker;
+    WorkerThreadCtrl m_readready_worker;
+
   };
 };  // namespace RTC
 

Modified: trunk/OpenRTM-aist/src/lib/rtm/InPortSHMProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/InPortSHMProvider.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/InPortSHMProvider.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -123,7 +123,7 @@
     throw (CORBA::SystemException)
   {
     RTC_PARANOID(("InPortSHMProvider::put()"));
-	if (m_buffer == 0)
+    if (m_connector == NULL)
 	{
 		return ::OpenRTM::PORT_ERROR;
 	}
@@ -152,7 +152,7 @@
 	
 	onReceived(cdr);
 	
-	BufferStatus::Enum ret = m_buffer->write(cdr);
+    BufferStatus::Enum ret = m_connector->write(cdr);
 	
 	return convertReturn(ret, cdr);
   }

Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -177,4 +177,9 @@
 	  m_inPortListeners = &(directInPort->getListeners());
 	  return true;
   }
+
+  CdrBufferBase::ReturnCode OutPortConnector::read(cdrMemoryStream &data)
+  {
+      return CdrBufferBase::BUFFER_OK;
+  }
 }; // namespace RTC

Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.h
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.h	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortConnector.h	2018-10-08 23:58:57 UTC (rev 3417)
@@ -273,6 +273,8 @@
       return write(m_cdr);
     }
 
+    virtual CdrBufferBase::ReturnCode read(cdrMemoryStream &data);
+
 	bool setInPort(InPortBase* directInPort);
 	/*!
 	* @if jp

Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortCorbaCdrProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortCorbaCdrProvider.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortCorbaCdrProvider.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -33,7 +33,7 @@
    * @endif
    */
   OutPortCorbaCdrProvider::OutPortCorbaCdrProvider(void)
-    : m_buffer(0)
+    : m_buffer(0), m_connector(NULL)
   {
     // PortProfile setting
     setInterfaceType("corba_cdr");
@@ -169,7 +169,7 @@
     // at least the output "data" area should be allocated
     data = new ::OpenRTM::CdrData();
 
-    if (m_buffer == 0)
+    if (m_connector == NULL)
       {
         onSenderError();
         return ::OpenRTM::UNKNOWN_ERROR;
@@ -176,7 +176,7 @@
       }
 
     cdrMemoryStream cdr = cdrMemoryStream();
-    CdrBufferBase::ReturnCode ret(m_buffer->read(cdr));
+    CdrBufferBase::ReturnCode ret(m_connector->read(cdr));
 
     if (ret == CdrBufferBase::BUFFER_OK)
       {

Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortDSProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortDSProvider.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortDSProvider.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -31,7 +31,7 @@
    * @endif
    */
   OutPortDSProvider::OutPortDSProvider(void)
-    : m_buffer(0)
+    : m_buffer(0), m_connector(NULL)
   {
     // PortProfile setting
     setInterfaceType("data_service");
@@ -167,7 +167,7 @@
     // at least the output "data" area should be allocated
     data = new ::RTC::OctetSeq();
 
-    if (m_buffer == 0)
+    if (m_connector == NULL)
       {
         onSenderError();
         return ::RTC::UNKNOWN_ERROR;
@@ -174,7 +174,7 @@
       }
 
     cdrMemoryStream cdr = cdrMemoryStream();
-    CdrBufferBase::ReturnCode ret(m_buffer->read(cdr));
+    CdrBufferBase::ReturnCode ret(m_connector->read(cdr));
 
     if (ret == CdrBufferBase::BUFFER_OK)
       {

Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -38,7 +38,8 @@
     : OutPortConnector(info, listeners),
       m_provider(provider),
       m_listeners(listeners),
-      m_buffer(buffer)
+      m_buffer(buffer),
+      m_sync_readwrite(false)
   {
     // create buffer
     if (m_buffer == 0)
@@ -54,6 +55,11 @@
     //    m_provider->init(m_profile /* , m_listeners */);
     m_provider->setListener(info, &m_listeners);
 
+    if (coil::toBool(info.properties["sync_readwrite"], "YES", "NO", false))
+    {
+        m_sync_readwrite = true;
+    }
+
     onConnect();
   }
 
@@ -80,10 +86,101 @@
   ConnectorBase::ReturnCode
   OutPortPullConnector::write(cdrMemoryStream& data)
   {
+
+    if (m_buffer == 0)
+    {
+        return PRECONDITION_NOT_MET;
+    }
+
+    if (m_sync_readwrite)
+    {
+        {
+            Guard guard(m_readready_worker.mutex_);
+            while (!m_readready_worker.completed_)
+            {
+                m_readready_worker.cond_.wait();
+            }
+        }
+    }
+
     m_buffer->write(data);
+
+    if (m_sync_readwrite)
+    {
+        {
+            Guard guard(m_writecompleted_worker.mutex_);
+            m_writecompleted_worker.completed_ = true;
+            m_writecompleted_worker.cond_.signal();
+        }
+
+
+          {
+              Guard guard(m_readcompleted_worker.mutex_);
+              while (!m_readcompleted_worker.completed_)
+              {
+                  m_readcompleted_worker.cond_.wait();
+              }
+          }
+          {
+              Guard guard(m_writecompleted_worker.mutex_);
+              m_writecompleted_worker.completed_ = false;
+          }
+    }
+
     return PORT_OK;
   }
 
+  CdrBufferBase::ReturnCode
+  OutPortPullConnector::read(cdrMemoryStream &data)
+  {
+      if (m_buffer == 0)
+      {
+          return CdrBufferBase::PRECONDITION_NOT_MET;
+      }
+
+      if (m_sync_readwrite)
+      {
+
+        {
+            Guard guard(m_readcompleted_worker.mutex_);
+            m_readcompleted_worker.completed_ = false;
+        }
+
+        {
+            Guard guard(m_readready_worker.mutex_);
+            m_readready_worker.completed_ = true;
+            m_readready_worker.cond_.signal();
+        }
+        {
+            Guard guard(m_writecompleted_worker.mutex_);
+            while (!m_writecompleted_worker.completed_)
+            {
+                m_writecompleted_worker.cond_.wait();
+            }
+        }
+      }
+
+      CdrBufferBase::ReturnCode ret = m_buffer->read(data);
+
+
+      if (m_sync_readwrite)
+      {
+        {
+            Guard guard(m_readcompleted_worker.mutex_);
+            m_readcompleted_worker.completed_ = true;
+            m_readcompleted_worker.cond_.signal();
+        }
+
+        {
+            Guard guard(m_readready_worker.mutex_);
+            m_readready_worker.completed_ = false;
+        }
+      }
+
+
+      return ret;
+  }
+
   /*!
    * @if jp
    * @brief 接続解除関数

Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.h
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.h	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortPullConnector.h	2018-10-08 23:58:57 UTC (rev 3417)
@@ -81,6 +81,7 @@
   class OutPortPullConnector
     : public OutPortConnector
   {
+    typedef coil::Guard<coil::Mutex> Guard;
   public:
     DATAPORTSTATUS_ENUM
 
@@ -164,6 +165,8 @@
      */
     virtual ReturnCode write(cdrMemoryStream& data);
 
+    virtual CdrBufferBase::ReturnCode read(cdrMemoryStream &data);
+
     /*!
      * @if jp
      * @brief 接続解除
@@ -282,6 +285,20 @@
      * @endif
      */
     CdrBufferBase* m_buffer;
+  private:
+      bool m_sync_readwrite;
+
+      struct WorkerThreadCtrl
+      {
+          WorkerThreadCtrl() : cond_(mutex_), completed_(false) {}
+          coil::Mutex mutex_;
+          coil::Condition<coil::Mutex> cond_;
+          bool completed_;
+      };
+      WorkerThreadCtrl m_writecompleted_worker;
+      WorkerThreadCtrl m_readcompleted_worker;
+      WorkerThreadCtrl m_readready_worker;
+
   };
 };  // namespace RTC
 

Modified: trunk/OpenRTM-aist/src/lib/rtm/OutPortSHMProvider.cpp
===================================================================
--- trunk/OpenRTM-aist/src/lib/rtm/OutPortSHMProvider.cpp	2018-10-08 04:04:30 UTC (rev 3416)
+++ trunk/OpenRTM-aist/src/lib/rtm/OutPortSHMProvider.cpp	2018-10-08 23:58:57 UTC (rev 3417)
@@ -33,7 +33,8 @@
    */
   OutPortSHMProvider::OutPortSHMProvider(void)
    : m_buffer(0),
-     m_memory_size(0)
+     m_memory_size(0),
+     m_connector(NULL)
   {
     // PortProfile setting
     setInterfaceType("shared_memory");
@@ -172,7 +173,7 @@
       }
 
     cdrMemoryStream cdr;
-    CdrBufferBase::ReturnCode ret(m_buffer->read(cdr));
+    CdrBufferBase::ReturnCode ret(m_connector->read(cdr));
     if (ret == CdrBufferBase::BUFFER_OK)
       {
 #ifdef ORB_IS_ORBEXPRESS



openrtm-commit メーリングリストの案内