Actions
バグ #4470
closedサブスクリプション型がnewの場合に処理が停止することがある問題
Start date:
02/28/2018
Due date:
% Done:
100%
Estimated time:
Description
サブスクリプション型をnewにした場合に、データ転送中にコネクタを切断すると処理が停止することがあるため原因の調査、修正を行う。
Updated by n-miyamoto over 6 years ago
- Status changed from 担当 to 解決
- % Done changed from 0 to 100
RingBuffer.pyの以下の箇所で停止していることを確認しました。
def write(self, value, sec = -1, nsec = 0):
try:
self._full_cond.acquire()
if self.full():
#省略
elif not overwrite and timedwrite: # "block" mode
#省略
ret = self._full_cond.wait(wait_time)
PublisherNewはバッファフルの時にwrite関数を呼ぶとブロックするようになっています。
このブロックはread関数内で解除できます。
def read(self, value, sec = -1, nsec = 0):
#省略
self._full_cond.acquire()
full_ = self.full()
if full_:
self.advanceRptr()
self._full_cond.notify()
else:
self.advanceRptr()
self._full_cond.release()
ただ、PublisherNewのpushNew関数ではデータ読み込み時にread関数を使用していないため、バッファフルのブロックが解除されません。
def pushNew(self):
self._rtcout.RTC_TRACE("pushNew()")
try:
self._buffer.advanceRptr(self._buffer.readable() - 1)
cdr = self._buffer.get()
self.onBufferRead(cdr)
self.onSend(cdr)
ret = self._consumer.put(cdr)
if ret != self.PORT_OK:
self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
return self.invokeListener(ret, cdr)
self.onReceived(cdr)
self._buffer.advanceRptr()
return self.PORT_OK
except:
self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
return self.CONNECTION_LOST
このため、バッファフル、バッファエンプティのブロック解除の処理をadvanceRptr関数、advanceWptrに移動しました。
def advanceRptr(self, n = 1, unlock_enable=True):
if unlock_enable and n > 0:
self._full_cond.acquire()
full_ = self.full()
guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
if (n > 0 and n > self._fillcount) or \
(n < 0 and n < (self._fillcount - self._length)):
return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
self._rpos = (self._rpos + n + self._length) % self._length
self._fillcount -= n
if unlock_enable and n > 0:
if full_:
self._full_cond.notify()
self._full_cond.release()
return OpenRTM_aist.BufferStatus.BUFFER_OK
def advanceWptr(self, n = 1, unlock_enable=True):
if unlock_enable and n > 0:
self._empty_cond.acquire()
empty = self.empty()
guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
if (n > 0 and n > (self._length - self._fillcount)) or \
(n < 0 and n < (-self._fillcount)):
return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
self._wpos = (self._wpos + n + self._length) % self._length
self._fillcount += n
if unlock_enable and n > 0:
if empty:
self._empty_cond.notify()
self._empty_cond.release()
return OpenRTM_aist.BufferStatus.BUFFER_OK
Actions