Bug 470716. Make the close-on-completion behavior of nsAsyncStreamCopier configurable. r+sr=bzbarsky.

This commit is contained in:
Michal Novotny 2009-03-30 15:10:13 -04:00
parent e7fbfa373f
commit 8e17d253af
9 changed files with 327 additions and 43 deletions

View File

@ -42,7 +42,7 @@ interface nsIOutputStream;
interface nsIRequestObserver;
interface nsIEventTarget;
[scriptable, uuid(72e515de-a91e-4154-bb78-e5244cbaae74)]
[scriptable, uuid(5a19ca27-e041-4aca-8287-eb248d4c50c0)]
interface nsIAsyncStreamCopier : nsIRequest
{
/**
@ -64,6 +64,10 @@ interface nsIAsyncStreamCopier : nsIRequest
* specifies how many bytes to read/write at a time. this controls
* the granularity of the copying. it should match the segment size
* of the "buffered" streams involved.
* @param aCloseSource
* true if aSource should be closed after copying.
* @param aCloseSink
* true if aSink should be closed after copying.
*
* NOTE: at least one of the streams must be buffered.
*/
@ -72,7 +76,9 @@ interface nsIAsyncStreamCopier : nsIRequest
in nsIEventTarget aTarget,
in boolean aSourceBuffered,
in boolean aSinkBuffered,
in unsigned long aChunkSize);
in unsigned long aChunkSize,
in boolean aCloseSource,
in boolean aCloseSink);
/**
* asyncCopy triggers the start of the copy. The observer will be notified

View File

@ -466,13 +466,16 @@ NS_NewAsyncStreamCopier(nsIAsyncStreamCopier **result,
nsIEventTarget *target,
PRBool sourceBuffered = PR_TRUE,
PRBool sinkBuffered = PR_TRUE,
PRUint32 chunkSize = 0)
PRUint32 chunkSize = 0,
PRBool closeSource = PR_TRUE,
PRBool closeSink = PR_TRUE)
{
nsresult rv;
nsCOMPtr<nsIAsyncStreamCopier> copier =
do_CreateInstance(NS_ASYNCSTREAMCOPIER_CONTRACTID, &rv);
if (NS_SUCCEEDED(rv)) {
rv = copier->Init(source, sink, target, sourceBuffered, sinkBuffered, chunkSize);
rv = copier->Init(source, sink, target, sourceBuffered, sinkBuffered,
chunkSize, closeSource, closeSink);
if (NS_SUCCEEDED(rv)) {
*result = nsnull;
copier.swap(*result);

View File

@ -92,6 +92,8 @@ nsAsyncStreamCopier::Complete(nsresult status)
nsCOMPtr<nsISupports> ctx;
{
nsAutoLock lock(mLock);
mCopierCtx = nsnull;
if (mIsPending) {
mIsPending = PR_FALSE;
mStatus = status;
@ -152,25 +154,21 @@ nsAsyncStreamCopier::GetStatus(nsresult *status)
NS_IMETHODIMP
nsAsyncStreamCopier::Cancel(nsresult status)
{
if (IsComplete())
nsCOMPtr<nsISupports> copierCtx;
{
nsAutoLock lock(mLock);
if (!mIsPending)
return NS_OK;
copierCtx.swap(mCopierCtx);
}
if (NS_SUCCEEDED(status)) {
NS_WARNING("cancel with non-failure status code");
status = NS_BASE_STREAM_CLOSED;
}
nsCOMPtr<nsIAsyncInputStream> asyncSource = do_QueryInterface(mSource);
if (asyncSource)
asyncSource->CloseWithStatus(status);
else
mSource->Close();
nsCOMPtr<nsIAsyncOutputStream> asyncSink = do_QueryInterface(mSink);
if (asyncSink)
asyncSink->CloseWithStatus(status);
else
mSink->Close();
if (copierCtx)
NS_CancelAsyncCopy(copierCtx, status);
return NS_OK;
}
@ -224,7 +222,9 @@ nsAsyncStreamCopier::Init(nsIInputStream *source,
nsIEventTarget *target,
PRBool sourceBuffered,
PRBool sinkBuffered,
PRUint32 chunkSize)
PRUint32 chunkSize,
PRBool closeSource,
PRBool closeSink)
{
NS_ASSERTION(sourceBuffered || sinkBuffered, "at least one stream must be buffered");
@ -239,6 +239,8 @@ nsAsyncStreamCopier::Init(nsIInputStream *source,
mSource = source;
mSink = sink;
mCloseSource = closeSource;
mCloseSink = closeSink;
mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS
: NS_ASYNCCOPY_VIA_WRITESEGMENTS;
@ -281,7 +283,8 @@ nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver *observer, nsISupports *ctx)
// OnAsyncCopyComplete.
NS_ADDREF_THIS();
rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize,
OnAsyncCopyComplete, this);
OnAsyncCopyComplete, this, mCloseSource, mCloseSink,
getter_AddRefs(mCopierCtx));
if (NS_FAILED(rv)) {
NS_RELEASE_THIS();
Cancel(rv);

View File

@ -76,12 +76,16 @@ private:
nsCOMPtr<nsIEventTarget> mTarget;
nsCOMPtr<nsISupports> mCopierCtx;
PRLock *mLock;
nsAsyncCopyMode mMode;
PRUint32 mChunkSize;
nsresult mStatus;
PRPackedBool mIsPending;
PRPackedBool mCloseSource;
PRPackedBool mCloseSink;
};
#endif // !nsAsyncStreamCopier_h__

View File

@ -2921,7 +2921,7 @@ ServerHandler.prototype =
//
var copier = new StreamCopier(bodyStream, outStream,
null,
true, true, 8192);
true, true, 8192, true, true);
copier.asyncCopy(copyObserver, null);
}
else

View File

@ -0,0 +1,174 @@
const Cc = Components.classes;
const Ci = Components.interfaces;
const Cr = Components.results;
const CC = Components.Constructor;
const StreamCopier = CC("@mozilla.org/network/async-stream-copier;1",
"nsIAsyncStreamCopier",
"init");
const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
"nsIScriptableInputStream",
"init");
const Pipe = CC("@mozilla.org/pipe;1",
"nsIPipe",
"init");
var pipe1;
var pipe2;
var copier;
var test_result;
var test_content;
var test_source_closed;
var test_sink_closed;
var test_nr;
var copyObserver =
{
onStartRequest: function(request, context) { },
onStopRequest: function(request, cx, statusCode)
{
// check status code
do_check_eq(statusCode, test_result);
// check number of copied bytes
do_check_eq(pipe2.inputStream.available(), test_content.length);
// check content
var scinp = new ScriptableInputStream(pipe2.inputStream);
var content = scinp.read(scinp.available());
do_check_eq(content, test_content);
// check closed sink
try {
pipe2.outputStream.write("closedSinkTest", 14);
do_check_false(test_sink_closed);
}
catch (ex) {
do_check_true(test_sink_closed);
}
// check closed source
try {
pipe1.outputStream.write("closedSourceTest", 16);
do_check_false(test_source_closed);
}
catch (ex) {
do_check_true(test_source_closed);
}
do_timeout(0, "do_test();");
},
QueryInterface: function(aIID)
{
if (aIID.equals(Ci.nsIRequestObserver) ||
aIID.equals(Ci.nsISupports))
return this;
throw Cr.NS_ERROR_NO_INTERFACE;
}
};
function startCopier(closeSource, closeSink) {
pipe1 = new Pipe(true /* nonBlockingInput */,
true /* nonBlockingOutput */,
0 /* segmentSize */,
0xffffffff /* segmentCount */,
null /* segmentAllocator */);
pipe2 = new Pipe(true /* nonBlockingInput */,
true /* nonBlockingOutput */,
0 /* segmentSize */,
0xffffffff /* segmentCount */,
null /* segmentAllocator */);
copier = new StreamCopier(pipe1.inputStream /* aSource */,
pipe2.outputStream /* aSink */,
null /* aTarget */,
true /* aSourceBuffered */,
true /* aSinkBuffered */,
8192 /* aChunkSize */,
closeSource /* aCloseSource */,
closeSink /* aCloseSink */);
copier.asyncCopy(copyObserver, null);
}
function do_test() {
test_nr++;
test_content = "test" + test_nr;
switch (test_nr) {
case 1:
case 2: // close sink
case 3: // close source
case 4: // close both
// test canceling transfer
// use some undefined error code to check if it is successfully passed
// to the request observer
test_result = 0x87654321;
test_source_closed = ((test_nr-1)>>1 != 0);
test_sink_closed = ((test_nr-1)%2 != 0);
startCopier(test_source_closed, test_sink_closed);
pipe1.outputStream.write(test_content, test_content.length);
pipe1.outputStream.flush();
do_timeout(20,
"copier.cancel(test_result);" +
"pipe1.outputStream.write(\"a\", 1);");
break;
case 5:
case 6: // close sink
case 7: // close source
case 8: // close both
// test copying with EOF on source
test_result = 0;
test_source_closed = ((test_nr-5)>>1 != 0);
test_sink_closed = ((test_nr-5)%2 != 0);
startCopier(test_source_closed, test_sink_closed);
pipe1.outputStream.write(test_content, test_content.length);
// we will close the source
test_source_closed = true;
pipe1.outputStream.close();
break;
case 9:
case 10: // close sink
case 11: // close source
case 12: // close both
// test copying with error on sink
// use some undefined error code to check if it is successfully passed
// to the request observer
test_result = 0x87654321;
test_source_closed = ((test_nr-9)>>1 != 0);
test_sink_closed = ((test_nr-9)%2 != 0);
startCopier(test_source_closed, test_sink_closed);
pipe1.outputStream.write(test_content, test_content.length);
pipe1.outputStream.flush();
// we will close the sink
test_sink_closed = true;
do_timeout(20,
"pipe2.outputStream" +
" .QueryInterface(Ci.nsIAsyncOutputStream)" +
" .closeWithStatus(test_result);" +
"pipe1.outputStream.write(\"a\", 1);");
break;
case 13:
do_test_finished();
break;
}
}
function run_test() {
test_nr = 0;
do_timeout(0, "do_test();");
do_test_pending();
}

View File

@ -20,7 +20,7 @@ function run_test() {
var streamCopier = Cc["@mozilla.org/network/async-stream-copier;1"]
.createInstance(Ci.nsIAsyncStreamCopier);
streamCopier.init(inStr, pipe.outputStream, null, true, true, 1024);
streamCopier.init(inStr, pipe.outputStream, null, true, true, 1024, true, true);
var ctx = {
};

View File

@ -250,6 +250,10 @@ public:
, mChunkSize(0)
, mEventInProcess(PR_FALSE)
, mEventIsPending(PR_FALSE)
, mCloseSource(PR_TRUE)
, mCloseSink(PR_TRUE)
, mCanceled(PR_FALSE)
, mCancelStatus(NS_OK)
{
}
@ -266,7 +270,9 @@ public:
nsIEventTarget *target,
nsAsyncCopyCallbackFun callback,
void *closure,
PRUint32 chunksize)
PRUint32 chunksize,
PRBool closeSource,
PRBool closeSink)
{
mSource = source;
mSink = sink;
@ -274,6 +280,8 @@ public:
mCallback = callback;
mClosure = closure;
mChunkSize = chunksize;
mCloseSource = closeSource;
mCloseSink = closeSink;
mLock = PR_NewLock();
if (!mLock)
@ -295,11 +303,28 @@ public:
return;
nsresult sourceCondition, sinkCondition;
nsresult cancelStatus;
PRBool canceled;
{
nsAutoLock lock(mLock);
canceled = mCanceled;
cancelStatus = mCancelStatus;
}
// ok, copy data from source to sink.
for (;;) {
PRUint32 n = DoCopy(&sourceCondition, &sinkCondition);
if (NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0) {
PRUint32 n;
PRBool copyFailed = PR_FALSE;
if (!canceled) {
n = DoCopy(&sourceCondition, &sinkCondition);
copyFailed = NS_FAILED(sourceCondition) ||
NS_FAILED(sinkCondition) || n == 0;
nsAutoLock lock(mLock);
canceled = mCanceled;
cancelStatus = mCancelStatus;
}
if (copyFailed && !canceled) {
if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
// need to wait for more data from source. while waiting for
// more source data, be sure to observe failures on output end.
@ -309,6 +334,7 @@ public:
mAsyncSink->AsyncWait(this,
nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
0, nsnull);
break;
}
else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
// need to wait for more room in the sink. while waiting for
@ -320,21 +346,29 @@ public:
mAsyncSource->AsyncWait(this,
nsIAsyncInputStream::WAIT_CLOSURE_ONLY,
0, nsnull);
break;
}
else {
}
if (copyFailed || canceled) {
if (mCloseSource) {
// close source
if (mAsyncSource)
mAsyncSource->CloseWithStatus(sinkCondition);
mAsyncSource->CloseWithStatus(canceled ? cancelStatus :
sinkCondition);
else
mSource->Close();
}
mAsyncSource = nsnull;
mSource = nsnull;
if (mCloseSink) {
// close sink
if (mAsyncSink)
mAsyncSink->CloseWithStatus(sourceCondition);
mAsyncSink->CloseWithStatus(canceled ? cancelStatus :
sourceCondition);
else
mSink->Close();
}
mAsyncSink = nsnull;
mSink = nsnull;
@ -345,14 +379,29 @@ public:
status = sinkCondition;
if (status == NS_BASE_STREAM_CLOSED)
status = NS_OK;
mCallback(mClosure, status);
}
mCallback(mClosure, canceled ? cancelStatus : status);
}
break;
}
}
}
nsresult Cancel(nsresult aReason)
{
nsAutoLock lock(mLock);
if (mCanceled)
return NS_ERROR_FAILURE;
if (NS_SUCCEEDED(aReason)) {
NS_WARNING("cancel with non-failure status code");
aReason = NS_BASE_STREAM_CLOSED;
}
mCanceled = PR_TRUE;
mCancelStatus = aReason;
return NS_OK;
}
NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream *source)
{
PostContinuationEvent();
@ -421,6 +470,10 @@ protected:
PRUint32 mChunkSize;
PRPackedBool mEventInProcess;
PRPackedBool mEventIsPending;
PRPackedBool mCloseSource;
PRPackedBool mCloseSink;
PRPackedBool mCanceled;
nsresult mCancelStatus;
};
NS_IMPL_THREADSAFE_ISUPPORTS3(nsAStreamCopier,
@ -523,7 +576,10 @@ NS_AsyncCopy(nsIInputStream *source,
nsAsyncCopyMode mode,
PRUint32 chunkSize,
nsAsyncCopyCallbackFun callback,
void *closure)
void *closure,
PRBool closeSource,
PRBool closeSink,
nsISupports **aCopierCtx)
{
NS_ASSERTION(target, "non-null target required");
@ -540,7 +596,14 @@ NS_AsyncCopy(nsIInputStream *source,
// Start() takes an owning ref to the copier...
NS_ADDREF(copier);
rv = copier->Start(source, sink, target, callback, closure, chunkSize);
rv = copier->Start(source, sink, target, callback, closure, chunkSize,
closeSource, closeSink);
if (aCopierCtx) {
*aCopierCtx = static_cast<nsISupports*>(
static_cast<nsIRunnable*>(copier));
NS_ADDREF(*aCopierCtx);
}
NS_RELEASE(copier);
return rv;
@ -548,6 +611,16 @@ NS_AsyncCopy(nsIInputStream *source,
//-----------------------------------------------------------------------------
NS_COM nsresult
NS_CancelAsyncCopy(nsISupports *aCopierCtx, nsresult aReason)
{
nsAStreamCopier *copier = static_cast<nsAStreamCopier *>(
static_cast<nsIRunnable *>(aCopierCtx));
return copier->Cancel(aReason);
}
//-----------------------------------------------------------------------------
NS_COM nsresult
NS_ConsumeStream(nsIInputStream *stream, PRUint32 maxCount, nsACString &result)
{

View File

@ -98,6 +98,12 @@ typedef void (* nsAsyncCopyCallbackFun)(void *closure, nsresult status);
* stream operation returns NS_BASE_STREAM_WOULD_BLOCK, then the stream will
* be QI'd to nsIAsync{In,Out}putStream and its AsyncWait method will be used
* to determine when to resume copying.
*
* Source and sink are closed by default when copying finishes or when error
* occurs. Caller can prevent closing source or sink by setting aCloseSource
* or aCloseSink to PR_FALSE.
*
* Caller can obtain aCopierCtx to be able to cancel copying.
*/
extern NS_COM nsresult
NS_AsyncCopy(nsIInputStream *aSource,
@ -106,7 +112,22 @@ NS_AsyncCopy(nsIInputStream *aSource,
nsAsyncCopyMode aMode = NS_ASYNCCOPY_VIA_READSEGMENTS,
PRUint32 aChunkSize = 4096,
nsAsyncCopyCallbackFun aCallbackFun = nsnull,
void *aCallbackClosure = nsnull);
void *aCallbackClosure = nsnull,
PRBool aCloseSource = PR_TRUE,
PRBool aCloseSink = PR_TRUE,
nsISupports **aCopierCtx = nsnull);
/**
* This function cancels copying started by function NS_AsyncCopy.
*
* @param aCopierCtx
* Copier context returned by NS_AsyncCopy.
* @param aReason
* A failure code indicating why the operation is being canceled.
* It is an error to pass a success code.
*/
extern NS_COM nsresult
NS_CancelAsyncCopy(nsISupports *aCopierCtx, nsresult aReason);
/**
* This function copies all of the available data from the stream (up to at