From 037e8307cfe1c057eb00c6179b171400a69db405 Mon Sep 17 00:00:00 2001 From: Steven Lee Date: Thu, 8 Nov 2012 14:35:02 -0500 Subject: [PATCH] Bug 805478 - Implement MessageLoopForIO::LineWatcher. r=dhylands --- ipc/chromium/src/base/message_loop.h | 1 + .../src/base/message_pump_libevent.cc | 43 +++++++++++++++++++ ipc/chromium/src/base/message_pump_libevent.h | 35 +++++++++++++++ 3 files changed, 79 insertions(+) diff --git a/ipc/chromium/src/base/message_loop.h b/ipc/chromium/src/base/message_loop.h index 0dc02dafc529..fb954b037c35 100644 --- a/ipc/chromium/src/base/message_loop.h +++ b/ipc/chromium/src/base/message_loop.h @@ -509,6 +509,7 @@ class MessageLoopForIO : public MessageLoop { typedef base::MessagePumpLibevent::Watcher Watcher; typedef base::MessagePumpLibevent::FileDescriptorWatcher FileDescriptorWatcher; + typedef base::LineWatcher LineWatcher; enum Mode { WATCH_READ = base::MessagePumpLibevent::WATCH_READ, diff --git a/ipc/chromium/src/base/message_pump_libevent.cc b/ipc/chromium/src/base/message_pump_libevent.cc index 9f31dbbfbde9..b4670ed2a759 100644 --- a/ipc/chromium/src/base/message_pump_libevent.cc +++ b/ipc/chromium/src/base/message_pump_libevent.cc @@ -15,6 +15,7 @@ #include "base/scoped_nsautorelease_pool.h" #include "base/scoped_ptr.h" #include "base/time.h" +#include "nsDependentSubstring.h" #include "third_party/libevent/event.h" // Lifecycle of struct event @@ -371,4 +372,46 @@ void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { delayed_work_time_ = delayed_work_time; } +void LineWatcher::OnFileCanReadWithoutBlocking(int aFd) +{ + ssize_t length = 0; + + while (true) { + length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex); + DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex)); + if (length <= 0) { + if (length < 0) { + if (errno == EINTR) { + continue; // retry system call when interrupted + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; // no data available: return and re-poll + } + DLOG(ERROR) << "Can't read from fd, error " << errno; + } else { + DLOG(ERROR) << "End of file"; + } + // At this point, assume that we can't actually access + // the socket anymore, and indicate an error. + OnError(); + mReceivedIndex = 0; + return; + } + + while (length-- > 0) { + DCHECK(mReceivedIndex < mBufferSize); + if (mReceiveBuffer[mReceivedIndex] == mTerminator) { + nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex); + OnLineRead(aFd, message); + if (length > 0) { + DCHECK(mReceivedIndex < (mBufferSize - 1)); + memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length); + } + mReceivedIndex = 0; + } else { + mReceivedIndex++; + } + } + } +} } // namespace base diff --git a/ipc/chromium/src/base/message_pump_libevent.h b/ipc/chromium/src/base/message_pump_libevent.h index 6b1084499304..04ecb832df59 100644 --- a/ipc/chromium/src/base/message_pump_libevent.h +++ b/ipc/chromium/src/base/message_pump_libevent.h @@ -7,11 +7,14 @@ #include "base/message_pump.h" #include "base/time.h" +#include "nsAutoPtr.h" // Declare structs we need from libevent.h rather than including it struct event_base; struct event; +class nsDependentCSubstring; + namespace base { // Class to monitor sockets and issue callbacks when sockets are ready for I/O @@ -175,6 +178,38 @@ class MessagePumpLibevent : public MessagePump { DISALLOW_COPY_AND_ASSIGN(MessagePumpLibevent); }; +/** + * LineWatcher overrides OnFileCanReadWithoutBlocking. It separates the read + * data by mTerminator and passes each line to OnLineRead. + */ +class LineWatcher : public MessagePumpLibevent::Watcher +{ +public: + LineWatcher(char aTerminator, int aBufferSize) : mReceivedIndex(0), + mBufferSize(aBufferSize), + mTerminator(aTerminator) + { + mReceiveBuffer = new char[mBufferSize]; + } + + ~LineWatcher() {} + +protected: + /** + * OnError will be called when |read| returns error. Derived class should + * implement this function to handle error cases when needed. + */ + virtual void OnError() {} + virtual void OnLineRead(int aFd, nsDependentCSubstring& aMessage) = 0; + virtual void OnFileCanWriteWithoutBlocking(int /* aFd */) {} +private: + virtual void OnFileCanReadWithoutBlocking(int aFd) MOZ_FINAL; + + nsAutoPtr mReceiveBuffer; + int mReceivedIndex; + int mBufferSize; + char mTerminator; +}; } // namespace base #endif // BASE_MESSAGE_PUMP_LIBEVENT_H_