Bug 805478 - NetdClient and VolumeManager inherit MessageLoopForIO::LineWatcher. r=dhylands

This commit is contained in:
Steven Lee 2012-11-08 14:35:02 -05:00
parent 037e8307cf
commit 02b3275657
4 changed files with 62 additions and 137 deletions

View File

@ -32,9 +32,9 @@ VolumeManager::StateObserverList VolumeManager::mStateObserverList;
/***************************************************************************/
VolumeManager::VolumeManager()
: mSocket(-1),
mCommandPending(false),
mRcvIdx(0)
: LineWatcher('\0', kRcvBufSize),
mSocket(-1),
mCommandPending(false)
{
DBG("VolumeManager constructor called");
}
@ -272,75 +272,37 @@ VolumeManager::WriteCommandData()
}
void
VolumeManager::OnFileCanReadWithoutBlocking(int aFd)
VolumeManager::OnLineRead(int aFd, nsDependentCSubstring& aMessage)
{
MOZ_ASSERT(aFd == mSocket.get());
while (true) {
ssize_t bytesRemaining = read(aFd, &mRcvBuf[mRcvIdx], sizeof(mRcvBuf) - mRcvIdx);
if (bytesRemaining < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
return;
}
if (errno == EINTR) {
continue;
}
ERR("Unknown read error: %d (%s) - restarting", errno, strerror(errno));
Restart();
return;
}
if (bytesRemaining == 0) {
// This means that vold probably crashed
ERR("Vold appears to have crashed - restarting");
Restart();
return;
}
// We got some data. Each line is terminated by a null character
DBG("Read %ld bytes", bytesRemaining);
while (bytesRemaining > 0) {
bytesRemaining--;
if (mRcvBuf[mRcvIdx] == '\0') {
// We found a line terminator. Each line is formatted as an
// integer response code followed by the rest of the line.
// Fish out the response code.
char *endPtr;
int responseCode = strtol(mRcvBuf, &endPtr, 10);
if (*endPtr == ' ') {
endPtr++;
}
char *endPtr;
int responseCode = strtol(aMessage.Data(), &endPtr, 10);
if (*endPtr == ' ') {
endPtr++;
}
// Now fish out the rest of the line after the response code
nsDependentCString responseLine(endPtr, &mRcvBuf[mRcvIdx] - endPtr);
DBG("Rcvd: %d '%s'", responseCode, responseLine.Data());
// Now fish out the rest of the line after the response code
nsDependentCString responseLine(endPtr, aMessage.Length() - (endPtr - aMessage.Data()));
DBG("Rcvd: %d '%s'", responseCode, responseLine.Data());
if (responseCode >= ResponseCode::UnsolicitedInformational) {
// These are unsolicited broadcasts. We intercept these and process
// them ourselves
HandleBroadcast(responseCode, responseLine);
} else {
// Everything else is considered to be part of the command response.
if (mCommands.size() > 0) {
VolumeCommand *cmd = mCommands.front();
cmd->HandleResponse(responseCode, responseLine);
if (responseCode >= ResponseCode::CommandOkay) {
// That's a terminating response. We can remove the command.
mCommands.pop();
mCommandPending = false;
// Start the next command, if there is one.
WriteCommandData();
}
} else {
ERR("Response with no command");
}
}
if (bytesRemaining > 0) {
// There is data in the receive buffer beyond the current line.
// Shift it down to the beginning.
memmove(&mRcvBuf[0], &mRcvBuf[mRcvIdx + 1], bytesRemaining);
}
mRcvIdx = 0;
} else {
mRcvIdx++;
if (responseCode >= ResponseCode::UnsolicitedInformational) {
// These are unsolicited broadcasts. We intercept these and process
// them ourselves
HandleBroadcast(responseCode, responseLine);
} else {
// Everything else is considered to be part of the command response.
if (mCommands.size() > 0) {
VolumeCommand *cmd = mCommands.front();
cmd->HandleResponse(responseCode, responseLine);
if (responseCode >= ResponseCode::CommandOkay) {
// That's a terminating response. We can remove the command.
mCommands.pop();
mCommandPending = false;
// Start the next command, if there is one.
WriteCommandData();
}
} else {
ERR("Response with no command");
}
}
}
@ -382,7 +344,6 @@ VolumeManager::Restart()
}
mCommandPending = false;
mSocket.dispose();
mRcvIdx = 0;
Start();
}
@ -405,6 +366,12 @@ VolumeManager::Start()
}
}
void
VolumeManager::OnError()
{
Restart();
}
/***************************************************************************/
static void

View File

@ -73,7 +73,7 @@ namespace system {
*
***************************************************************************/
class VolumeManager : public MessageLoopForIO::Watcher,
class VolumeManager : public MessageLoopForIO::LineWatcher,
public RefCounted<VolumeManager>
{
public:
@ -131,8 +131,9 @@ public:
protected:
virtual void OnFileCanReadWithoutBlocking(int aFd);
virtual void OnLineRead(int aFd, nsDependentCSubstring& aMessage);
virtual void OnFileCanWriteWithoutBlocking(int aFd);
virtual void OnError();
private:
bool OpenSocket();
@ -155,8 +156,6 @@ private:
VolumeArray mVolumeArray;
CommandQueue mCommands;
bool mCommandPending;
char mRcvBuf[kRcvBufSize];
size_t mRcvIdx;
MessageLoopForIO::FileDescriptorWatcher mReadWatcher;
MessageLoopForIO::FileDescriptorWatcher mWriteWatcher;
RefPtr<VolumeResponseCallback> mBroadcastCallback;

View File

@ -84,10 +84,10 @@ namespace mozilla {
namespace ipc {
NetdClient::NetdClient()
: mSocket(INVALID_SOCKET)
: LineWatcher('\0', MAX_COMMAND_SIZE)
, mSocket(INVALID_SOCKET)
, mIOLoop(MessageLoopForIO::current())
, mCurrentWriteOffset(0)
, mReceivedIndex(0)
, mReConnectTimes(0)
{
MOZ_COUNT_CTOR(NetdClient);
@ -147,63 +147,25 @@ NetdClient::OpenSocket()
return true;
}
void
NetdClient::OnFileCanReadWithoutBlocking(int aFd)
void NetdClient::OnLineRead(int aFd, nsDependentCSubstring& aMessage)
{
ssize_t length = 0;
// Set errno to 0 first. For preventing to use the stale version of errno.
errno = 0;
// We found a line terminator. Each line is formatted as an
// integer response code followed by the rest of the line.
// Fish out the response code.
int responseCode = strtol(aMessage.Data(), nullptr, 10);
// TODO, Bug 783966, handle InterfaceChange(600) and BandwidthControl(601).
if (!errno && responseCode < 600) {
NetdCommand* response = new NetdCommand();
// Passing all the response message, including the line terminator.
response->mSize = aMessage.Length();
memcpy(response->mData, aMessage.Data(), aMessage.Length());
gNetdConsumer->MessageReceived(response);
}
MOZ_ASSERT(aFd == mSocket.get());
while (true) {
errno = 0;
MOZ_ASSERT(mReceivedIndex < MAX_COMMAND_SIZE);
length = read(aFd, &mReceiveBuffer[mReceivedIndex], MAX_COMMAND_SIZE - mReceivedIndex);
MOZ_ASSERT(length <= ssize_t(MAX_COMMAND_SIZE - mReceivedIndex));
if (length <= 0) {
if (length == -1) {
if (errno == EINTR) {
continue; // retry system call when interrupted
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return; // no data available: return and re-poll
}
}
LOG("Can't read from netd error: %d (%s) length: %d", errno, strerror(errno), length);
// At this point, assume that we can't actually access
// the socket anymore, and start a reconnect loop.
Restart();
return;
}
while (length-- > 0) {
MOZ_ASSERT(mReceivedIndex < MAX_COMMAND_SIZE);
if (mReceiveBuffer[mReceivedIndex] == '\0') {
// We found a line terminator. Each line is formatted as an
// integer response code followed by the rest of the line.
// Fish out the response code.
errno = 0;
int responseCode = strtol(mReceiveBuffer, nullptr, 10);
// TODO, Bug 783966, handle InterfaceChange(600) and BandwidthControl(601).
if (!errno && responseCode < 600) {
NetdCommand* response = new NetdCommand();
// Passing all the response message, including the line terminator.
response->mSize = mReceivedIndex + 1;
memcpy(response->mData, mReceiveBuffer, mReceivedIndex + 1);
gNetdConsumer->MessageReceived(response);
}
if (!responseCode || errno) {
LOG("Can't parse netd's response: %d (%s)", errno, strerror(errno));
}
// There is data in the receive buffer beyond the current line.
// Shift it down to the beginning.
if (length > 0) {
MOZ_ASSERT(mReceivedIndex < (MAX_COMMAND_SIZE - 1));
memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length);
}
mReceivedIndex = 0;
} else {
mReceivedIndex++;
}
}
if (!responseCode) {
LOG("Can't parse netd's response");
}
}
@ -215,7 +177,7 @@ NetdClient::OnFileCanWriteWithoutBlocking(int aFd)
}
void
NetdClient::Restart()
NetdClient::OnError()
{
MOZ_ASSERT(MessageLoop::current() == XRE_GetIOMessageLoop());
@ -223,7 +185,6 @@ NetdClient::Restart()
mWriteWatcher.StopWatchingFileDescriptor();
mSocket.dispose();
mReceivedIndex = 0;
mCurrentWriteOffset = 0;
mCurrentNetdCommand = nullptr;
while (!mOutgoingQ.empty()) {
@ -299,7 +260,7 @@ NetdClient::WriteNetdCommand()
write_amount);
if (written < 0) {
LOG("Cannot write to network, error %d\n", (int) written);
Restart();
OnError();
return;
}

View File

@ -38,7 +38,7 @@ class NetdWriteTask : public Task
virtual void Run();
};
class NetdClient : public MessageLoopForIO::Watcher,
class NetdClient : public MessageLoopForIO::LineWatcher,
public RefCounted<NetdClient>
{
public:
@ -51,8 +51,8 @@ public:
private:
void WriteNetdCommand();
void Restart();
virtual void OnFileCanReadWithoutBlocking(int aFd);
virtual void OnError();
virtual void OnLineRead(int aFd, nsDependentCSubstring& aMessage);
virtual void OnFileCanWriteWithoutBlocking(int aFd);
bool OpenSocket();
@ -61,10 +61,8 @@ private:
MessageLoopForIO::FileDescriptorWatcher mWriteWatcher;
ScopedClose mSocket;
NetdCommandQueue mOutgoingQ;
char mReceiveBuffer[MAX_COMMAND_SIZE];
nsAutoPtr<NetdCommand> mCurrentNetdCommand;
size_t mCurrentWriteOffset;
size_t mReceivedIndex;
size_t mReConnectTimes;
};