diff --git a/netwerk/base/src/nsFileTransport.cpp b/netwerk/base/src/nsFileTransport.cpp index 62ce0c4a81f0..9d1a861e2bee 100644 --- a/netwerk/base/src/nsFileTransport.cpp +++ b/netwerk/base/src/nsFileTransport.cpp @@ -420,6 +420,12 @@ nsFileTransport::OnFull(nsIBuffer *buffer) { return NS_OK; } +NS_IMETHODIMP +nsFileTransport::OnWrite(nsIBuffer* aBuffer, PRUint32 aCount) +{ + return NS_OK; +} + NS_IMETHODIMP nsFileTransport::OnEmpty(nsIBuffer *buffer) { PR_EnterMonitor(mMonitor); diff --git a/netwerk/base/src/nsFileTransport.h b/netwerk/base/src/nsFileTransport.h index 3623c30ede80..9dea64890678 100644 --- a/netwerk/base/src/nsFileTransport.h +++ b/netwerk/base/src/nsFileTransport.h @@ -67,6 +67,7 @@ public: // nsIBufferObserver methods: NS_IMETHOD OnFull(nsIBuffer* buffer); + NS_IMETHOD OnWrite(nsIBuffer* aBuffer, PRUint32 aCount); NS_IMETHOD OnEmpty(nsIBuffer* buffer); // nsFileTransport methods: diff --git a/netwerk/base/src/nsSocketTransport.cpp b/netwerk/base/src/nsSocketTransport.cpp index 2ee621509b26..768c07001451 100644 --- a/netwerk/base/src/nsSocketTransport.cpp +++ b/netwerk/base/src/nsSocketTransport.cpp @@ -22,15 +22,12 @@ #include "nspr.h" #include "nsCRT.h" -#include "nsCOMPtr.h" #include "nsIServiceManager.h" #include "nscore.h" #include "netCore.h" #include "nsIStreamListener.h" #include "nsSocketTransport.h" #include "nsSocketTransportService.h" -#include "nsIBuffer.h" -#include "nsIBufferInputStream.h" #include "nsIBufferOutputStream.h" #include "nsAutoLock.h" @@ -50,8 +47,6 @@ nsSocketState gStateTable[eSocketOperation_Max][eSocketState_Max] = { eSocketState_Error, // WaitConnect -> Error eSocketState_Error, // Connected -> Error eSocketState_Error, // WaitReadWrite -> Error - eSocketState_Error, // DoneRead -> Error - eSocketState_Error, // DoneWrite -> Error eSocketState_Error, // Done -> Error eSocketState_Error, // Timeout -> Error eSocketState_Error // Error -> Error @@ -64,8 +59,6 @@ nsSocketState gStateTable[eSocketOperation_Max][eSocketState_Max] = { eSocketState_Connected, // WaitConnect -> Connected eSocketState_Connected, // Connected -> Done eSocketState_Error, // WaitReadWrite -> Error - eSocketState_Error, // DoneRead -> Error - eSocketState_Error, // DoneWrite -> Error eSocketState_Connected, // Done -> Connected eSocketState_Error, // Timeout -> Error eSocketState_Closed // Error -> Closed @@ -78,8 +71,6 @@ nsSocketState gStateTable[eSocketOperation_Max][eSocketState_Max] = { eSocketState_Connected, // WaitConenct -> Connected eSocketState_WaitReadWrite, // Connected -> WaitReadWrite eSocketState_Done, // WaitReadWrite -> Done - eSocketState_Connected, // DoneRead -> Connected - eSocketState_Connected, // DoneWrite -> Connected eSocketState_Connected, // Done -> Connected eSocketState_Error, // Timeout -> Error eSocketState_Closed // Error -> Closed @@ -131,21 +122,15 @@ nsSocketTransport::nsSocketTransport() mLock = nsnull; mSuspendCount = 0; - mIsWaitingForRead = PR_FALSE; + + mReadWriteState = 0; + SetReadType (eSocketRead_None); + SetWriteType(eSocketWrite_None); mCurrentState = eSocketState_Created; mOperation = eSocketOperation_None; mSelectFlags = 0; - mReadBuffer = nsnull; - mReadStream = nsnull; - mReadContext = nsnull; - mReadListener = nsnull; - - mWriteStream = nsnull; - mWriteContext = nsnull; - mWriteObserver = nsnull; - mWriteCount = 0; mSourceOffset = 0; mService = nsnull; @@ -186,15 +171,21 @@ nsSocketTransport::~nsSocketTransport() PR_LOG(gSocketLog, PR_LOG_DEBUG, ("Deleting nsSocketTransport [this=%x].\n", this)); - NS_IF_RELEASE(mReadListener); - NS_IF_RELEASE(mReadStream); - NS_IF_RELEASE(mReadBuffer); - NS_IF_RELEASE(mReadContext); + // Release the nsCOMPtrs... + // + // It is easier to debug problems if these are released before the + // nsSocketTransport context is lost... + // + mReadListener = null_nsCOMPtr(); + mReadStream = null_nsCOMPtr(); + mReadContext = null_nsCOMPtr(); + mReadBuffer = null_nsCOMPtr(); - NS_IF_RELEASE(mWriteObserver); - NS_IF_RELEASE(mWriteStream); - NS_IF_RELEASE(mWriteContext); - + mWriteObserver = null_nsCOMPtr(); + mWriteStream = null_nsCOMPtr(); + mWriteContext = null_nsCOMPtr(); + mWriteBuffer = null_nsCOMPtr(); + NS_IF_RELEASE(mService); if (mHostName) { @@ -271,6 +262,16 @@ nsresult nsSocketTransport::Process(PRInt16 aSelectFlags) "CurrentState = %d\n", this, aSelectFlags, mCurrentState)); + // + // Check for an error during PR_Poll(...) + // + if (PR_POLL_EXCEPT & aSelectFlags) { + PR_LOG(gSocketLog, PR_LOG_ERROR, + ("Operation failed via PR_POLL_EXCEPT. [this=%x].\n", this)); + // An error has occurred, so cancel the read and/or write operation... + mCurrentState = eSocketState_Error; + } + while (!done) { // @@ -292,54 +293,83 @@ nsresult nsSocketTransport::Process(PRInt16 aSelectFlags) break; case eSocketState_Connected: - // Fire a notification for read... - if (mReadListener) { - mReadListener->OnStartRequest(this, mReadContext); + // + // A connection has been established with the server + // + mSelectFlags = PR_POLL_EXCEPT; + if (GetReadType() != eSocketRead_None) { + // Set the select flags for non-blocking reads... + mSelectFlags |= PR_POLL_READ; + + // Fire a notification that the read has started... + if (mReadListener) { + mReadListener->OnStartRequest(this, mReadContext); + } } - // Fire a notification for write... - if (mWriteObserver) { - mWriteObserver->OnStartRequest(this, mWriteContext); + if (GetWriteType() != eSocketWrite_None) { + // Set the select flags for non-blocking writes... + mSelectFlags |= PR_POLL_WRITE; + + // Fire a notification that the write has started... + if (mWriteObserver) { + mWriteObserver->OnStartRequest(this, mWriteContext); + } } break; - case eSocketState_Done: case eSocketState_Error: - case eSocketState_DoneRead: - case eSocketState_DoneWrite: PR_LOG(gSocketLog, PR_LOG_DEBUG, - ("Transport [this=%x] is in done state %d.\n", this, mCurrentState)); + ("Transport [this=%x] is in error state.\n", this)); - // Fire a notification that the read has finished... - if (eSocketState_DoneWrite != mCurrentState) { + // Cancel any read and/or write requests... + SetFlag(eSocketRead_Done); + SetFlag(eSocketWrite_Done); + // + // Fall into the Done state... + // + case eSocketState_Done: + PR_LOG(gSocketLog, PR_LOG_DEBUG, + ("Transport [this=%x] is in done state.\n", this)); + + if (GetFlag(eSocketRead_Done)) { + // Fire a notification that the read has finished... if (mReadListener) { mReadListener->OnStopRequest(this, mReadContext, rv, nsnull); - NS_RELEASE(mReadListener); - NS_IF_RELEASE(mReadContext); + mReadListener = null_nsCOMPtr(); + mReadContext = null_nsCOMPtr(); } - NS_IF_RELEASE(mReadStream); + mReadStream = null_nsCOMPtr(); // XXX: The buffer should be reused... - NS_IF_RELEASE(mReadBuffer); + mReadBuffer = null_nsCOMPtr(); + SetReadType(eSocketRead_None); + ClearFlag(eSocketRead_Done); } - // Fire a notification that the write has finished... - if (eSocketState_DoneRead != mCurrentState) { + if (GetFlag(eSocketWrite_Done)) { + // Fire a notification that the write has finished... if (mWriteObserver) { mWriteObserver->OnStopRequest(this, mWriteContext, rv, nsnull); - NS_RELEASE(mWriteObserver); - NS_IF_RELEASE(mWriteContext); + mWriteObserver = null_nsCOMPtr(); + mWriteContext = null_nsCOMPtr(); } - NS_IF_RELEASE(mWriteStream); + mWriteStream = null_nsCOMPtr(); + mWriteBuffer = null_nsCOMPtr(); + SetWriteType(eSocketWrite_None); + ClearFlag(eSocketWrite_Done); } // - // Set up the connection for the next operation... + // Are all read and write requests done? // - if (mReadStream || mWriteStream) { - mCurrentState = eSocketState_WaitReadWrite; - } else { + if ((GetReadType() == eSocketRead_None) && + (GetWriteType() == eSocketWrite_None)) + { mCurrentState = gStateTable[mOperation][mCurrentState]; mOperation = eSocketOperation_None; done = PR_TRUE; + } else { + // Still reading or writing... + mCurrentState = eSocketState_WaitReadWrite; } continue; @@ -352,19 +382,20 @@ nsresult nsSocketTransport::Process(PRInt16 aSelectFlags) break; case eSocketState_WaitReadWrite: + // Process the read request... if (mReadStream) { rv = doRead(aSelectFlags); if (NS_OK == rv) { - mCurrentState = eSocketState_DoneRead; - continue; + SetFlag(eSocketRead_Done); + break; } } - + // Process the write request... if (NS_SUCCEEDED(rv) && mWriteStream) { rv = doWrite(aSelectFlags); if (NS_OK == rv) { - mCurrentState = eSocketState_DoneWrite; - continue; + SetFlag(eSocketWrite_Done); + break; } } break; @@ -502,11 +533,11 @@ nsresult nsSocketTransport::doConnection(PRInt16 aSelectFlags) "aSelectFlags = %x.\n", this, aSelectFlags)); - // - // Step 1: - // Create a new TCP socket structure (if necessary)... - // if (!mSocketFD) { + // + // Step 1: + // Create a new TCP socket structure... + // mSocketFD = PR_NewTCPSocket(); if (mSocketFD) { PRSocketOptionData opt; @@ -530,48 +561,46 @@ nsresult nsSocketTransport::doConnection(PRInt16 aSelectFlags) else { rv = NS_ERROR_OUT_OF_MEMORY; } - } - // - // Step 2: - // Initiate the connect() to the host... - // - // This is only done the first time doConnection(...) is called. - // If aSelectFlags == 0 then this is the first time... Otherwise, - // PR_Poll(...) would have set the flags non-zero. - // - if (NS_SUCCEEDED(rv) && (0 == aSelectFlags)) { - status = PR_Connect(mSocketFD, &mNetAddress, gConnectTimeout); - if (PR_SUCCESS != status) { - PRErrorCode code = PR_GetError(); - // - // If the PR_Connect(...) would block, then return WOULD_BLOCK... - // It is the callers responsibility to place the transport on the - // select list of the transport thread... - // - if ((PR_WOULD_BLOCK_ERROR == code) || - (PR_IN_PROGRESS_ERROR == code)) { + // + // Step 2: + // Initiate the connect() to the host... + // + // This is only done the first time doConnection(...) is called. + // + if (NS_SUCCEEDED(rv)) { + status = PR_Connect(mSocketFD, &mNetAddress, gConnectTimeout); + if (PR_SUCCESS != status) { + PRErrorCode code = PR_GetError(); + // + // If the PR_Connect(...) would block, then return WOULD_BLOCK... + // It is the callers responsibility to place the transport on the + // select list of the transport thread... + // + if ((PR_WOULD_BLOCK_ERROR == code) || + (PR_IN_PROGRESS_ERROR == code)) { - // Set up the select flags for connect... - mSelectFlags = (PR_POLL_READ | PR_POLL_EXCEPT | PR_POLL_WRITE); - rv = NS_BASE_STREAM_WOULD_BLOCK; - } - // - // If the socket is already connected, then return success... - // - else if (PR_IS_CONNECTED_ERROR == code) { - rv = NS_OK; - } - // - // The connection was refused... - // - else { - // Connection refused... - PR_LOG(gSocketLog, PR_LOG_ERROR, - ("Connection Refused [this=%x]. PRErrorCode = %x\n", - this, code)); + // Set up the select flags for connect... + mSelectFlags = (PR_POLL_READ | PR_POLL_EXCEPT | PR_POLL_WRITE); + rv = NS_BASE_STREAM_WOULD_BLOCK; + } + // + // If the socket is already connected, then return success... + // + else if (PR_IS_CONNECTED_ERROR == code) { + rv = NS_OK; + } + // + // The connection was refused... + // + else { + // Connection refused... + PR_LOG(gSocketLog, PR_LOG_ERROR, + ("Connection Refused [this=%x]. PRErrorCode = %x\n", + this, code)); - rv = NS_ERROR_CONNECTION_REFUSED; + rv = NS_ERROR_CONNECTION_REFUSED; + } } } } @@ -579,7 +608,7 @@ nsresult nsSocketTransport::doConnection(PRInt16 aSelectFlags) // Step 3: // Process the flags returned by PR_Poll() if any... // - else if (NS_SUCCEEDED(rv) && aSelectFlags) { + else if (aSelectFlags) { if (PR_POLL_EXCEPT & aSelectFlags) { PR_LOG(gSocketLog, PR_LOG_ERROR, ("Connection Refused via PR_POLL_EXCEPT. [this=%x].\n", this)); @@ -595,6 +624,8 @@ nsresult nsSocketTransport::doConnection(PRInt16 aSelectFlags) else if (PR_POLL_WRITE & aSelectFlags) { rv = NS_OK; } + } else { + rv = NS_BASE_STREAM_WOULD_BLOCK; } PR_LOG(gSocketLog, PR_LOG_DEBUG, @@ -651,11 +682,56 @@ nsReadFromSocket(void* closure, PR_LOG(gSocketLog, PR_LOG_DEBUG, ("nsReadFromSocket [fd=%x]. rv = %x. Buffer space = %d. Bytes read =%d\n", - fd, rv, count, *readCount)); + fd, rv, count, *readCount)); return rv; } +NS_METHOD +nsWriteToSocket(void* closure, + const char* fromRawSegment, + PRUint32 toOffset, + PRUint32 count, + PRUint32 *writeCount) +{ + nsresult rv = NS_OK; + PRInt32 len; + PRErrorCode code; + + PRFileDesc* fd = (PRFileDesc*)closure; + + *writeCount = 0; + + len = PR_Write(fd, fromRawSegment, count); + if (len > 0) { + *writeCount = (PRUint32)len; + } + // + // Error... + // + else { + code = PR_GetError(); + + if (PR_WOULD_BLOCK_ERROR == code) { + rv = NS_BASE_STREAM_WOULD_BLOCK; + } + else { + PR_LOG(gSocketLog, PR_LOG_ERROR, + ("PR_Write() failed. PRErrorCode = %x\n", code)); + + // XXX: What should this error code be? + rv = NS_ERROR_FAILURE; + } + } + + PR_LOG(gSocketLog, PR_LOG_DEBUG, + ("nsWriteToSocket [fd=%x]. rv = %x. Buffer space = %d. Bytes written =%d\n", + fd, rv, count, *writeCount)); + + return rv; + +} + //----- // // doRead: @@ -676,57 +752,49 @@ nsresult nsSocketTransport::doRead(PRInt16 aSelectFlags) nsresult rv = NS_OK; NS_ASSERTION(eSocketState_WaitReadWrite == mCurrentState, "Wrong state."); + NS_ASSERTION(GetReadType() != eSocketRead_None, "Bad Read Type!"); - PR_LOG(gSocketLog, PR_LOG_DEBUG, - ("+++ Entering nsSocketTransport::doRead() [this=%x].\t" - "aSelectFlags = %x.\n", - this, aSelectFlags)); - - // - // Check for an error during PR_Poll(...) - // - if (PR_POLL_EXCEPT & aSelectFlags) { - PR_LOG(gSocketLog, PR_LOG_ERROR, - ("PR_Read() failed via PR_POLL_EXCEPT. [this=%x].\n", this)); - - // XXX: What should this error code be? - rv = NS_ERROR_FAILURE; - } // // Fill the stream with as much data from the network as possible... // // - else { - totalBytesWritten = 0; - rv = mReadBuffer->WriteSegments(nsReadFromSocket, (void*)mSocketFD, - MAX_IO_TRANSFER_SIZE, &totalBytesWritten); - PR_LOG(gSocketLog, PR_LOG_DEBUG, - ("FillStream [fd=%x]. rv = %x. Bytes read =%d\n", - mSocketFD, rv, totalBytesWritten)); + totalBytesWritten = 0; + // + // Release the transport lock... WriteSegments(...) aquires the nsBuffer + // lock which could cause a deadlock by blocking the socket transport + // thread + // + PR_Unlock(mLock); + rv = mReadBuffer->WriteSegments(nsReadFromSocket, (void*)mSocketFD, + MAX_IO_TRANSFER_SIZE, &totalBytesWritten); + PR_Lock(mLock); - // - // Deal with the possible return values... - // - if (NS_BASE_STREAM_FULL == rv) { - rv = NS_BASE_STREAM_WOULD_BLOCK; - } - else if (NS_BASE_STREAM_EOF == rv) { - mSelectFlags &= (~PR_POLL_READ); - rv = NS_OK; - } - else if (NS_SUCCEEDED(rv)) { - // continue to return WOULD_BLOCK until we've completely finished this read - rv = NS_BASE_STREAM_WOULD_BLOCK; - } + PR_LOG(gSocketLog, PR_LOG_DEBUG, + ("WriteSegments [fd=%x]. rv = %x. Bytes read =%d\n", + mSocketFD, rv, totalBytesWritten)); - // - // Fire a single OnDataAvaliable(...) notification once as much data has - // been filled into the stream as possible... - // - if (totalBytesWritten && mReadListener) { + // + // Deal with the possible return values... + // + if (NS_BASE_STREAM_EOF == rv) { + mSelectFlags &= (~PR_POLL_READ); + rv = NS_OK; + } + else if (NS_SUCCEEDED(rv)) { + // continue to return WOULD_BLOCK until we've completely finished this read + rv = NS_BASE_STREAM_WOULD_BLOCK; + } + + // + // Fire a single OnDataAvaliable(...) notification once as much data has + // been filled into the stream as possible... + // + if (totalBytesWritten) { + if (mReadListener) { nsresult rv1; - rv1 = mReadListener->OnDataAvailable(this, mReadContext, mReadStream, mSourceOffset, + rv1 = mReadListener->OnDataAvailable(this, mReadContext, mReadStream, + mSourceOffset, totalBytesWritten); // // If the consumer returns failure, then cancel the operation... @@ -734,15 +802,11 @@ nsresult nsSocketTransport::doRead(PRInt16 aSelectFlags) if (NS_FAILED(rv1)) { rv = rv1; } - mSourceOffset += totalBytesWritten; + } else if (GetReadType() == eSocketRead_Sync) { + nsAutoCMonitor mon(mReadBuffer); + mon.Notify(); } - } - - // - // Set up the select flags for read... - // - if (NS_BASE_STREAM_WOULD_BLOCK == rv) { - mSelectFlags |= (PR_POLL_READ | PR_POLL_EXCEPT); + mSourceOffset += totalBytesWritten; } PR_LOG(gSocketLog, PR_LOG_DEBUG, @@ -770,12 +834,11 @@ nsresult nsSocketTransport::doRead(PRInt16 aSelectFlags) //----- nsresult nsSocketTransport::doWrite(PRInt16 aSelectFlags) { - PRUint32 bytesRead, totalBytesRead; - PRInt32 maxBytesToRead, len; - PRErrorCode code; + PRUint32 totalBytesRead; nsresult rv = NS_OK; NS_ASSERTION(eSocketState_WaitReadWrite == mCurrentState, "Wrong state."); + NS_ASSERTION(GetWriteType() != eSocketWrite_None, "Bad Write Type!"); PR_LOG(gSocketLog, PR_LOG_DEBUG, ("+++ Entering nsSocketTransport::doWrite() [this=%x].\t" @@ -783,67 +846,14 @@ nsresult nsSocketTransport::doWrite(PRInt16 aSelectFlags) "mWriteCount = %d\n", this, aSelectFlags, mWriteCount)); - // - // Check for an error during PR_Poll(...) - // - if (PR_POLL_EXCEPT & aSelectFlags) { - PR_LOG(gSocketLog, PR_LOG_ERROR, - ("PR_Write() failed via PR_POLL_EXCEPT. [this=%x].\n", this)); + totalBytesRead = 0; - // XXX: What should this error code be? - rv = NS_ERROR_FAILURE; + if (mWriteBuffer) { + rv = doWriteFromBuffer(&totalBytesRead); } + else { - totalBytesRead = 0; - while (NS_OK == rv) { - // Determine the amount of data to read from the input stream... - if ((mWriteCount > 0) && (mWriteCount < MAX_IO_TRANSFER_SIZE)) { - maxBytesToRead = mWriteCount; - } else { - maxBytesToRead = sizeof(gIOBuffer); - } - rv = mWriteStream->Read(gIOBuffer, maxBytesToRead, &bytesRead); - if (NS_SUCCEEDED(rv) && bytesRead) { - // Update the counters... - totalBytesRead += bytesRead; - if (mWriteCount > 0) { - mWriteCount -= bytesRead; - } - // Write the data to the socket... - len = PR_Write(mSocketFD, gIOBuffer, bytesRead); - - if (len < 0) { - code = PR_GetError(); - - if (PR_WOULD_BLOCK_ERROR == code) { - rv = NS_BASE_STREAM_WOULD_BLOCK; - } - else { - PR_LOG(gSocketLog, PR_LOG_ERROR, - ("PR_Write() failed. [this=%x]. PRErrorCode = %x\n", - this, code)); - - // XXX: What should this error code be? - rv = NS_ERROR_FAILURE; - } - } - } - // - // The write operation has completed... - // - if ((mWriteCount == 0) || (NS_BASE_STREAM_EOF == rv) ) { - mSelectFlags &= (~PR_POLL_WRITE); - rv = NS_OK; - break; - } - } - } - - // - // Set up the select flags for connect... - // - if (NS_BASE_STREAM_WOULD_BLOCK == rv) { - mSelectFlags |= (PR_POLL_WRITE | PR_POLL_EXCEPT); + rv = doWriteFromStream(&totalBytesRead); } PR_LOG(gSocketLog, PR_LOG_DEBUG, @@ -855,6 +865,117 @@ nsresult nsSocketTransport::doWrite(PRInt16 aSelectFlags) } +nsresult nsSocketTransport::doWriteFromBuffer(PRUint32 *aCount) +{ + nsresult rv; + + *aCount = 0; + // + // Release the transport lock... ReadSegments(...) aquires the nsBuffer + // lock which could cause a deadlock by blocking the socket transport + // thread + // + PR_Unlock(mLock); + rv = mWriteBuffer->ReadSegments(nsWriteToSocket, (void*)mSocketFD, + MAX_IO_TRANSFER_SIZE, aCount); + PR_Lock(mLock); + + mWriteCount -= *aCount; + + PR_LOG(gSocketLog, PR_LOG_DEBUG, + ("ReadSegments [fd=%x]. rv = %x. Bytes written =%d\n", + mSocketFD, rv, *aCount)); + + if (NS_BASE_STREAM_EOF == rv) { + // + // The write operation has completed... + // + mSelectFlags &= (~PR_POLL_WRITE); + rv = NS_OK; + } + else if (NS_SUCCEEDED(rv)) { + // + // If the buffer is empty, then notify the reader and stop polling + // for write until there is data in the buffer. See the OnWrite() + // notification... + // + if (mWriteCount == 0) { + SetFlag(eSocketWrite_Wait); + mSelectFlags &= (~PR_POLL_WRITE); + + PR_Unlock(mLock); + { + nsAutoCMonitor mon(mWriteBuffer); + mon.Notify(); + } + PR_Lock(mLock); + } + + // continue to return WOULD_BLOCK until we've completely finished + // this write... + rv = NS_BASE_STREAM_WOULD_BLOCK; + } + + return rv; +} + + +nsresult nsSocketTransport::doWriteFromStream(PRUint32 *aCount) +{ + PRUint32 bytesRead; + PRInt32 maxBytesToRead, len; + PRErrorCode code; + nsresult rv = NS_OK; + + *aCount = 0; + while (NS_OK == rv) { + // Determine the amount of data to read from the input stream... + if ((mWriteCount > 0) && (mWriteCount < MAX_IO_TRANSFER_SIZE)) { + maxBytesToRead = mWriteCount; + } else { + maxBytesToRead = sizeof(gIOBuffer); + } + + bytesRead = 0; + rv = mWriteStream->Read(gIOBuffer, maxBytesToRead, &bytesRead); + if (NS_SUCCEEDED(rv) && bytesRead) { + // Update the counters... + *aCount += bytesRead; + if (mWriteCount > 0) { + mWriteCount -= bytesRead; + } + // Write the data to the socket... + len = PR_Write(mSocketFD, gIOBuffer, bytesRead); + + if (len < 0) { + code = PR_GetError(); + + if (PR_WOULD_BLOCK_ERROR == code) { + rv = NS_BASE_STREAM_WOULD_BLOCK; + } + else { + PR_LOG(gSocketLog, PR_LOG_ERROR, + ("PR_Write() failed. [this=%x]. PRErrorCode = %x\n", + this, code)); + + // XXX: What should this error code be? + rv = NS_ERROR_FAILURE; + } + } + } + // + // The write operation has completed... + // + if ((mWriteCount == 0) || (NS_BASE_STREAM_EOF == rv) ) { + mSelectFlags &= (~PR_POLL_WRITE); + rv = NS_OK; + break; + } + } + return rv; +} + + nsresult nsSocketTransport::CloseConnection(void) { PRStatus status; @@ -998,19 +1119,45 @@ nsSocketTransport::Resume(void) // -------------------------------------------------------------------------- // NS_IMETHODIMP -nsSocketTransport::OnFull(nsIBuffer* buffer) +nsSocketTransport::OnFull(nsIBuffer* aBuffer) { PR_LOG(gSocketLog, PR_LOG_DEBUG, - ("nsSocketTransport::OnFull() [this=%x].\n", this)); + ("nsSocketTransport::OnFull() [this=%x] nsIBuffer=%x.\n", + this, aBuffer)); - // - // Block the transport... It is assumed that the calling thread *is* - // holding the socket transport lock so Suspend(...) cannot be called. - // However, it is safe to directly modify the suspend count... - // - if (!mIsWaitingForRead) { - mSuspendCount += 1; - mIsWaitingForRead = PR_TRUE; + if (aBuffer == mReadBuffer) { + NS_ASSERTION(!GetFlag(eSocketRead_Wait), "Already waiting!"); + + SetFlag(eSocketRead_Wait); + mSelectFlags &= (~PR_POLL_READ); + } + + return NS_OK; +} + + +NS_IMETHODIMP +nsSocketTransport::OnWrite(nsIBuffer* aBuffer, PRUint32 aCount) +{ + nsresult rv = NS_OK; + + PR_LOG(gSocketLog, PR_LOG_DEBUG, + ("nsSocketTransport::OnWrite() [this=%x]. nsIBuffer=%x Count=%d\n", + this, aBuffer, aCount)); + + if (aBuffer == mWriteBuffer) { + // Enter the socket transport lock... + nsAutoLock lock(mLock); + + mWriteCount += aCount; + if (GetFlag(eSocketWrite_Wait)) { + ClearFlag(eSocketWrite_Wait); + mSelectFlags |= PR_POLL_WRITE; + + // Start the crank. + mOperation = eSocketOperation_ReadWrite; + rv = mService->AddToWorkQ(this); + } } return NS_OK; @@ -1018,21 +1165,23 @@ nsSocketTransport::OnFull(nsIBuffer* buffer) NS_IMETHODIMP -nsSocketTransport::OnEmpty(nsIBuffer* buffer) +nsSocketTransport::OnEmpty(nsIBuffer* aBuffer) { nsresult rv = NS_OK; PR_LOG(gSocketLog, PR_LOG_DEBUG, - ("nsSocketTransport::OnEmpty() [this=%x].\n", this)); + ("nsSocketTransport::OnEmpty() [this=%x] nsIBuffer=%x.\n", + this, aBuffer)); - // - // Unblock the transport... It is assumed that the calling thread is not - // holding the socket transport lock so it is safe to call Resume() - // directly... - // - if (mIsWaitingForRead) { - rv = Resume(); - mIsWaitingForRead = PR_FALSE; + if (aBuffer == mReadBuffer) { + // Enter the socket transport lock... + nsAutoLock lock(mLock); + + if (GetFlag(eSocketRead_Wait)) { + ClearFlag(eSocketRead_Wait); + mSelectFlags |= PR_POLL_READ; + rv = mService->AddToWorkQ(this); + } } return rv; @@ -1057,8 +1206,7 @@ nsSocketTransport::AsyncRead(PRUint32 startPosition, PRInt32 readCount, { // XXX deal with startPosition and readCount parameters nsresult rv = NS_OK; - nsCOMPtr eventQ; - + // Enter the socket transport lock... nsAutoLock lock(mLock); @@ -1068,39 +1216,46 @@ nsSocketTransport::AsyncRead(PRUint32 startPosition, PRInt32 readCount, this, readCount)); // If a read is already in progress then fail... - if (mReadListener) { + if (GetReadType() != eSocketRead_None) { rv = NS_ERROR_IN_PROGRESS; } - // Get the event queue of the current thread... - NS_WITH_SERVICE(nsIEventQueueService, eventQService, kEventQueueService, &rv); - if (NS_SUCCEEDED(rv)) { - rv = eventQService->GetThreadEventQueue(PR_CurrentThread(), - getter_AddRefs(eventQ)); - } - // Create a new non-blocking input stream for reading data into... if (NS_SUCCEEDED(rv) && !mReadStream) { - rv = NS_NewBuffer(&mReadBuffer, MAX_IO_BUFFER_SIZE/2, + rv = NS_NewBuffer(getter_AddRefs(mReadBuffer), MAX_IO_BUFFER_SIZE/2, 2*MAX_IO_BUFFER_SIZE, this); if (NS_SUCCEEDED(rv)) { - rv = NS_NewBufferInputStream(&mReadStream, mReadBuffer, PR_FALSE); + nsCOMPtr newStream; + + rv = NS_NewBufferInputStream(getter_AddRefs(newStream), + mReadBuffer, PR_FALSE); + mReadStream = newStream; + } + } + + // Create a marshalling stream listener to receive notifications... + if (NS_SUCCEEDED(rv)) { + nsCOMPtr eventQ; + + // Get the event queue of the current thread... + NS_WITH_SERVICE(nsIEventQueueService, eventQService, kEventQueueService, &rv); + if (NS_SUCCEEDED(rv)) { + rv = eventQService->GetThreadEventQueue(PR_CurrentThread(), + getter_AddRefs(eventQ)); + } + if (NS_SUCCEEDED(rv)) { + rv = NS_NewAsyncStreamListener(getter_AddRefs(mReadListener), + eventQ, aListener); } } if (NS_SUCCEEDED(rv)) { // Store the context used for this read... - NS_IF_RELEASE(mReadContext); mReadContext = aContext; - NS_IF_ADDREF(mReadContext); - // Create a marshalling stream listener to receive notifications... - rv = NS_NewAsyncStreamListener(&mReadListener, eventQ, aListener); - } - - if (NS_SUCCEEDED(rv)) { mOperation = eSocketOperation_ReadWrite; + SetReadType(eSocketRead_Async); rv = mService->AddToWorkQ(this); } @@ -1131,20 +1286,15 @@ nsSocketTransport::AsyncWrite(nsIInputStream* aFromStream, this, writeCount)); // If a write is already in progress then fail... - if (mWriteStream) { + if (GetWriteType() != eSocketWrite_None) { rv = NS_ERROR_IN_PROGRESS; } if (NS_SUCCEEDED(rv)) { - mWriteStream = aFromStream; - NS_ADDREF(aFromStream); - - NS_IF_RELEASE(mWriteContext); + mWriteStream = aFromStream; mWriteContext = aContext; - NS_IF_ADDREF(mWriteContext); // Create a marshalling stream observer to receive notifications... - NS_IF_RELEASE(mWriteObserver); if (aObserver) { nsCOMPtr eventQ; @@ -1155,7 +1305,8 @@ nsSocketTransport::AsyncWrite(nsIInputStream* aFromStream, getter_AddRefs(eventQ)); } if (NS_SUCCEEDED(rv)) { - rv = NS_NewAsyncStreamObserver(&mWriteObserver, eventQ, aObserver); + rv = NS_NewAsyncStreamObserver(getter_AddRefs(mWriteObserver), + eventQ, aObserver); } } @@ -1164,6 +1315,8 @@ nsSocketTransport::AsyncWrite(nsIInputStream* aFromStream, if (NS_SUCCEEDED(rv)) { mOperation = eSocketOperation_ReadWrite; + SetWriteType(eSocketWrite_Async); + rv = mService->AddToWorkQ(this); } @@ -1190,27 +1343,32 @@ nsSocketTransport::OpenInputStream(PRUint32 startPosition, PRInt32 readCount, this)); // If a read is already in progress then fail... - if (mReadListener) { + if (GetReadType() != eSocketRead_None) { rv = NS_ERROR_IN_PROGRESS; } // Create a new blocking input stream for reading data into... if (NS_SUCCEEDED(rv)) { - NS_IF_RELEASE(mReadStream); - // XXX: The buffer should be reused... - NS_IF_RELEASE(mReadBuffer); - NS_IF_RELEASE(mReadContext); + mReadListener = null_nsCOMPtr(); + mReadContext = null_nsCOMPtr(); - rv = NS_NewBuffer(&mReadBuffer, MAX_IO_BUFFER_SIZE/2, + rv = NS_NewBuffer(getter_AddRefs(mReadBuffer), MAX_IO_BUFFER_SIZE/2, 2*MAX_IO_BUFFER_SIZE, this); if (NS_SUCCEEDED(rv)) { - rv = NS_NewBufferInputStream(&mReadStream, mReadBuffer, PR_TRUE); + nsCOMPtr newStream; + + rv = NS_NewBufferInputStream(getter_AddRefs(newStream), + mReadBuffer, PR_TRUE); + mReadStream = newStream; + *result = newStream; + NS_IF_ADDREF(*result); } } if (NS_SUCCEEDED(rv)) { mOperation = eSocketOperation_ReadWrite; + SetReadType(eSocketRead_Sync); rv = mService->AddToWorkQ(this); } @@ -1238,35 +1396,43 @@ nsSocketTransport::OpenOutputStream(PRUint32 startPosition, nsIOutputStream* *re this)); // If a write is already in progress then fail... - if (mWriteStream) { + if (GetWriteType() != eSocketWrite_None) { rv = NS_ERROR_IN_PROGRESS; } if (NS_SUCCEEDED(rv)) { - NS_IF_RELEASE(mWriteObserver); - NS_IF_RELEASE(mWriteContext); + // No observer or write context is available... + mWriteCount = 0; + mWriteObserver = null_nsCOMPtr(); + mWriteContext = null_nsCOMPtr(); // We want a pipe here so the caller can "write" into one end // and the other end (aWriteStream) gets the data. This data // is then written to the underlying socket when nsSocketTransport::doWrite() // is called. - // XXX not sure if this should be blocking (PR_TRUE) or non-blocking. - nsIBufferOutputStream* out = nsnull; - nsIBufferInputStream* in = nsnull; - rv = NS_NewPipe(&in, &out, - MAX_IO_BUFFER_SIZE, MAX_IO_BUFFER_SIZE, PR_TRUE, nsnull); - // No need to addref since NewPipe(...) already did!!! - mWriteStream = in; - *result = out; - } + nsCOMPtr out; + nsCOMPtr in; + rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), + MAX_IO_BUFFER_SIZE, MAX_IO_BUFFER_SIZE, PR_TRUE, this); + if (NS_SUCCEEDED(rv)) { + mWriteStream = in; + *result = out; + NS_IF_ADDREF(*result); + + out->GetBuffer(getter_AddRefs(mWriteBuffer)); + } + + SetWriteType(eSocketWrite_Sync); + } +/* if (NS_SUCCEEDED(rv)) { mOperation = eSocketOperation_ReadWrite; // Start the crank. rv = mService->AddToWorkQ(this); } - +*/ PR_LOG(gSocketLog, PR_LOG_DEBUG, ("--- Leaving nsSocketTransport::OpenOutputStream() [this=%x].\t" "rv = %x.\n", diff --git a/netwerk/base/src/nsSocketTransport.h b/netwerk/base/src/nsSocketTransport.h index 30e6e84e0208..ba6573f2cbaf 100644 --- a/netwerk/base/src/nsSocketTransport.h +++ b/netwerk/base/src/nsSocketTransport.h @@ -23,6 +23,7 @@ #include "prio.h" #include "prnetdb.h" +#include "nsCOMPtr.h" #include "nsIChannel.h" #include "nsIBuffer.h" #include "nsIInputStream.h" @@ -48,12 +49,10 @@ enum nsSocketState { eSocketState_WaitConnect = 3, eSocketState_Connected = 4, eSocketState_WaitReadWrite = 5, - eSocketState_DoneRead = 6, - eSocketState_DoneWrite = 7, - eSocketState_Done = 8, - eSocketState_Timeout = 9, - eSocketState_Error = 10, - eSocketState_Max = 11 + eSocketState_Done = 6, + eSocketState_Timeout = 7, + eSocketState_Error = 8, + eSocketState_Max = 9 }; enum nsSocketOperation { @@ -63,6 +62,40 @@ enum nsSocketOperation { eSocketOperation_Max = 3 }; +// +// The following emun provides information about the currently +// active read and/or write requests... +// +// +-------------------------------+ +// | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +// +-------------------------------+ +// <-----flag bits----><-type bits-> +// +// Bits: +// 0-3: Type (ie. None, Async, Sync) +// 4: Done flag. +// 5: Wait flag. +// 6-7: Unused flags... +// +// +// +enum nsSocketReadWriteInfo { + eSocketRead_None = 0x0000, + eSocketRead_Async = 0x0001, + eSocketRead_Sync = 0x0002, + eSocketRead_Done = 0x0010, + eSocketRead_Wait = 0x0020, + eSocketRead_Type_Mask = 0x000F, + eSocketRead_Flag_Mask = 0x00F0, + + eSocketWrite_None = 0x0000, + eSocketWrite_Async = 0x0100, + eSocketWrite_Sync = 0x0200, + eSocketWrite_Done = 0x1000, + eSocketWrite_Wait = 0x2000, + eSocketWrite_Type_Mask = 0x0F00, + eSocketWrite_Flag_Mask = 0xF000, +}; // Forward declarations... class nsSocketTransportService; @@ -96,8 +129,9 @@ public: NS_IMETHOD GetContentType(char * *aContentType); // nsIBufferObserver methods: - NS_IMETHOD OnFull(nsIBuffer* buffer); - NS_IMETHOD OnEmpty(nsIBuffer* buffer); + NS_IMETHOD OnFull (nsIBuffer* aBuffer); + NS_IMETHOD OnWrite(nsIBuffer* aBuffer, PRUint32 aCount); + NS_IMETHOD OnEmpty(nsIBuffer* aBuffer); // nsSocketTransport methods: nsSocketTransport(); @@ -123,6 +157,34 @@ protected: nsresult doRead(PRInt16 aSelectFlags); nsresult doWrite(PRInt16 aSelectFlags); + nsresult doWriteFromBuffer(PRUint32 *aCount); + nsresult doWriteFromStream(PRUint32 *aCount); + +private: + // Access methods for manipulating the ReadWriteInfo... + inline void SetReadType(nsSocketReadWriteInfo aType) { + mReadWriteState = (mReadWriteState & ~eSocketRead_Type_Mask) | aType; + } + inline PRUint32 GetReadType(void) { + return mReadWriteState & eSocketRead_Type_Mask; + } + inline void SetWriteType(nsSocketReadWriteInfo aType) { + mReadWriteState = (mReadWriteState & ~eSocketWrite_Type_Mask) | aType; + } + inline PRUint32 GetWriteType(void) { + return mReadWriteState & eSocketWrite_Type_Mask; + } + inline void SetFlag(nsSocketReadWriteInfo aFlag) { + mReadWriteState |= aFlag; + } + inline PRUint32 GetFlag(nsSocketReadWriteInfo aFlag) { + return mReadWriteState & aFlag; + } + + inline void ClearFlag(nsSocketReadWriteInfo aFlag) { + mReadWriteState &= ~aFlag; + } + protected: PRCList mListLink; @@ -130,7 +192,8 @@ protected: nsSocketState mCurrentState; nsSocketOperation mOperation; - PRBool mIsWaitingForRead; + PRUint32 mReadWriteState; + PRInt32 mSuspendCount; PRFileDesc* mSocketFD; @@ -140,15 +203,16 @@ protected: char* mHostName; PRInt32 mPort; - nsISupports* mReadContext; - nsIStreamListener* mReadListener; - nsIBufferInputStream* mReadStream; - nsIBuffer* mReadBuffer; + nsCOMPtr mReadContext; + nsCOMPtr mReadListener; + nsCOMPtr mReadStream; + nsCOMPtr mReadBuffer; - PRInt32 mWriteCount; - nsISupports* mWriteContext; - nsIStreamObserver* mWriteObserver; - nsIInputStream* mWriteStream; + PRInt32 mWriteCount; + nsCOMPtr mWriteContext; + nsCOMPtr mWriteObserver; + nsCOMPtr mWriteStream; + nsCOMPtr mWriteBuffer; PRUint32 mSourceOffset; diff --git a/netwerk/base/src/nsSocketTransportService.cpp b/netwerk/base/src/nsSocketTransportService.cpp index 8a54c58b8fa0..36565f7dc16f 100644 --- a/netwerk/base/src/nsSocketTransportService.cpp +++ b/netwerk/base/src/nsSocketTransportService.cpp @@ -252,21 +252,19 @@ nsresult nsSocketTransportService::AddToSelectList(nsSocketTransport* aTransport break; } } + // Initialize/update the info in the entry... + pfd = &mSelectFDSet[i]; + pfd->fd = aTransport->GetSocket();; + pfd->in_flags = aTransport->GetSelectFlags(); + pfd->out_flags = 0; // Add the FileDesc to the PRPollDesc list... if (i == mSelectFDSetCount) { - pfd = &mSelectFDSet[mSelectFDSetCount]; - pfd->fd = aTransport->GetSocket();; - pfd->in_flags = aTransport->GetSelectFlags(); - pfd->out_flags = 0; // Add the transport instance to the corresponding active transport list... NS_ADDREF(aTransport); mActiveTransportList[mSelectFDSetCount] = aTransport; mSelectFDSetCount += 1; } } - else { - rv = NS_ERROR_FAILURE; - } return rv; }