Bug 805478 - Implement MessageLoopForIO::LineWatcher. r=dhylands

This commit is contained in:
Steven Lee 2012-11-08 14:35:02 -05:00
parent b38f788032
commit 037e8307cf
3 changed files with 79 additions and 0 deletions

View File

@ -509,6 +509,7 @@ class MessageLoopForIO : public MessageLoop {
typedef base::MessagePumpLibevent::Watcher Watcher;
typedef base::MessagePumpLibevent::FileDescriptorWatcher
FileDescriptorWatcher;
typedef base::LineWatcher LineWatcher;
enum Mode {
WATCH_READ = base::MessagePumpLibevent::WATCH_READ,

View File

@ -15,6 +15,7 @@
#include "base/scoped_nsautorelease_pool.h"
#include "base/scoped_ptr.h"
#include "base/time.h"
#include "nsDependentSubstring.h"
#include "third_party/libevent/event.h"
// Lifecycle of struct event
@ -371,4 +372,46 @@ void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) {
delayed_work_time_ = delayed_work_time;
}
void LineWatcher::OnFileCanReadWithoutBlocking(int aFd)
{
ssize_t length = 0;
while (true) {
length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex);
DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex));
if (length <= 0) {
if (length < 0) {
if (errno == EINTR) {
continue; // retry system call when interrupted
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return; // no data available: return and re-poll
}
DLOG(ERROR) << "Can't read from fd, error " << errno;
} else {
DLOG(ERROR) << "End of file";
}
// At this point, assume that we can't actually access
// the socket anymore, and indicate an error.
OnError();
mReceivedIndex = 0;
return;
}
while (length-- > 0) {
DCHECK(mReceivedIndex < mBufferSize);
if (mReceiveBuffer[mReceivedIndex] == mTerminator) {
nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex);
OnLineRead(aFd, message);
if (length > 0) {
DCHECK(mReceivedIndex < (mBufferSize - 1));
memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length);
}
mReceivedIndex = 0;
} else {
mReceivedIndex++;
}
}
}
}
} // namespace base

View File

@ -7,11 +7,14 @@
#include "base/message_pump.h"
#include "base/time.h"
#include "nsAutoPtr.h"
// Declare structs we need from libevent.h rather than including it
struct event_base;
struct event;
class nsDependentCSubstring;
namespace base {
// Class to monitor sockets and issue callbacks when sockets are ready for I/O
@ -175,6 +178,38 @@ class MessagePumpLibevent : public MessagePump {
DISALLOW_COPY_AND_ASSIGN(MessagePumpLibevent);
};
/**
* LineWatcher overrides OnFileCanReadWithoutBlocking. It separates the read
* data by mTerminator and passes each line to OnLineRead.
*/
class LineWatcher : public MessagePumpLibevent::Watcher
{
public:
LineWatcher(char aTerminator, int aBufferSize) : mReceivedIndex(0),
mBufferSize(aBufferSize),
mTerminator(aTerminator)
{
mReceiveBuffer = new char[mBufferSize];
}
~LineWatcher() {}
protected:
/**
* OnError will be called when |read| returns error. Derived class should
* implement this function to handle error cases when needed.
*/
virtual void OnError() {}
virtual void OnLineRead(int aFd, nsDependentCSubstring& aMessage) = 0;
virtual void OnFileCanWriteWithoutBlocking(int /* aFd */) {}
private:
virtual void OnFileCanReadWithoutBlocking(int aFd) MOZ_FINAL;
nsAutoPtr<char> mReceiveBuffer;
int mReceivedIndex;
int mBufferSize;
char mTerminator;
};
} // namespace base
#endif // BASE_MESSAGE_PUMP_LIBEVENT_H_