Bug 454740 - Asynchronous storage should batch/chunk results

This changeset batches results obtained by the async storage API so we are not
flooding the calling thread with so many events.
r=asuth
This commit is contained in:
Shawn Wilsher 2008-10-29 13:13:32 -04:00
parent 4a8ab3f0a5
commit da47747760
2 changed files with 199 additions and 102 deletions

View File

@ -41,6 +41,7 @@
#include "nsAutoPtr.h" #include "nsAutoPtr.h"
#include "nsAutoLock.h" #include "nsAutoLock.h"
#include "nsCOMArray.h" #include "nsCOMArray.h"
#include "prtime.h"
#include "sqlite3.h" #include "sqlite3.h"
@ -53,6 +54,20 @@
#include "mozStorageError.h" #include "mozStorageError.h"
#include "mozStorageEvents.h" #include "mozStorageEvents.h"
/**
* The following constants help batch rows into result sets.
* MAX_MILLISECONDS_BETWEEN_RESULTS was chosen because any user-based task that
* takes less than 200 milliseconds is considered to feel instantaneous to end
* users. MAX_ROWS_PER_RESULT was arbitrarily chosen to reduce the number of
* dispatches to calling thread, while also providing reasonably-sized sets of
* data for consumers. Both of these constants are used because we assume that
* consumers are trying to avoid blocking their execution thread for long
* periods of time, and dispatching many small events to the calling thread will
* end up blocking it.
*/
#define MAX_MILLISECONDS_BETWEEN_RESULTS 100
#define MAX_ROWS_PER_RESULT 15
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
//// Asynchronous Statement Execution //// Asynchronous Statement Execution
@ -94,6 +109,8 @@ public:
NS_IMETHOD Run() NS_IMETHOD Run()
{ {
NS_ASSERTION(mCallback, "Trying to notify about results without a callback!");
if (mEventStatus->runEvent()) if (mEventStatus->runEvent())
(void)mCallback->HandleResult(mResults); (void)mCallback->HandleResult(mResults);
@ -213,6 +230,8 @@ public:
, mTransactionManager(nsnull) , mTransactionManager(nsnull)
, mCallback(aCallback) , mCallback(aCallback)
, mCallingThread(do_GetCurrentThread()) , mCallingThread(do_GetCurrentThread())
, mMaxIntervalWait(PR_MicrosecondsToInterval(MAX_MILLISECONDS_BETWEEN_RESULTS))
, mIntervalStart(PR_IntervalNow())
, mState(PENDING) , mState(PENDING)
, mCancelRequested(PR_FALSE) , mCancelRequested(PR_FALSE)
, mLock(nsAutoLock::NewLock("AsyncExecute::mLock")) , mLock(nsAutoLock::NewLock("AsyncExecute::mLock"))
@ -250,106 +269,17 @@ public:
} }
// Execute each statement, giving the callback results if it returns any. // Execute each statement, giving the callback results if it returns any.
nsresult rv = NS_OK;
for (PRUint32 i = 0; i < mStatements.Length(); i++) { for (PRUint32 i = 0; i < mStatements.Length(); i++) {
// We need to hold a lock for statement execution so we can properly PRBool finished = (i == (mStatements.Length() - 1));
// reflect state in case we are canceled. We unlock in a few areas in if (!ExecuteAndProcessStatement(mStatements[i], finished))
// order to allow for cancelation to occur.
nsAutoLock mutex(mLock);
while (PR_TRUE) {
int rc = sqlite3_step(mStatements[i]);
// Break out if we have no more results
if (rc == SQLITE_DONE)
break;
// Some errors are not fatal, and we can handle them and continue.
if (rc != SQLITE_OK && rc != SQLITE_ROW) {
if (rc == SQLITE_BUSY) {
// We do not want to hold our lock while we yield.
nsAutoUnlock cancelationScope(mLock);
// Yield, and try again
PR_Sleep(PR_INTERVAL_NO_WAIT);
continue;
}
// Set error state
mState = ERROR;
// No longer need to hold our mutex
mutex.unlock();
// Notify
sqlite3 *db = sqlite3_db_handle(mStatements[i]);
(void)NotifyError(rc, sqlite3_errmsg(db));
// And complete
return NotifyComplete();
}
// If we do not have a callback, there's no point in executing this
// statement anymore.
if (!mCallback)
break;
// If we have been canceled, there is no point in going on...
if (mCancelRequested) {
mState = CANCELED;
mutex.unlock();
return NotifyComplete();
}
// For the rest of this loop, it is safe to not hold the lock and allow
// for cancelation. We may add an event to the calling thread, but that
// thread will not end up running when it checks back with us to see if
// it should run.
nsAutoUnlock cancelationScope(mLock);
// Build result object
// XXX bug 454740 chunk these results better
nsRefPtr<mozStorageResultSet> results(new mozStorageResultSet());
if (!results) {
rv = NS_ERROR_OUT_OF_MEMORY;
break;
}
nsRefPtr<mozStorageRow> row(new mozStorageRow());
if (!row) {
rv = NS_ERROR_OUT_OF_MEMORY;
break;
}
rv = row->initialize(mStatements[i]);
if (NS_FAILED(rv))
break;
rv = results->add(row);
if (NS_FAILED(rv))
break;
// Notify caller
(void)NotifyResults(results);
}
// If we have an error that we have not already notified about, set our
// state accordingly, and notify.
if (NS_FAILED(rv)) {
mState = ERROR;
// We no longer need to hold our mutex
mutex.unlock();
(void)NotifyError(mozIStorageError::ERROR, "");
break; break;
}
// If we are done, we need to set our state accordingly while we still
// hold our lock. We would have already dropped out of the loop if we
// were canceled or had an error at this point.
if (i == (mStatements.Length() - 1))
mState = COMPLETED;
} }
// If we still have results that we haven't notified about, take care of
// them now.
if (mResultSet)
(void)NotifyResults();
// Notify about completion // Notify about completion
return NotifyComplete(); return NotifyComplete();
} }
@ -404,15 +334,161 @@ public:
} }
private: private:
AsyncExecute() { } AsyncExecute() : mMaxIntervalWait(0) { }
~AsyncExecute() ~AsyncExecute()
{ {
nsAutoLock::DestroyLock(mLock); nsAutoLock::DestroyLock(mLock);
} }
/**
* Executes a given statement until completion, an error occurs, or we are
* canceled. If aFinished is true, we know that we are the last statement,
* and should set mState accordingly.
*
* @pre mLock is not held
*
* @param aStatement
* The statement to execute and then process.
* @param aFinished
* Indicates if this is the last statement or not. If it is, we have
* to set the proper state.
* @returns true if we should continue to process statements, false otherwise.
*/
PRBool ExecuteAndProcessStatement(sqlite3_stmt *aStatement, PRBool aFinished)
{
// We need to hold a lock for statement execution so we can properly
// reflect state in case we are canceled. We unlock in a few areas in
// order to allow for cancelation to occur.
nsAutoLock mutex(mLock);
nsresult rv = NS_OK;
while (PR_TRUE) {
int rc = sqlite3_step(aStatement);
// Break out if we have no more results
if (rc == SQLITE_DONE)
break;
// Some errors are not fatal, and we can handle them and continue.
if (rc != SQLITE_OK && rc != SQLITE_ROW) {
if (rc == SQLITE_BUSY) {
// We do not want to hold our lock while we yield.
nsAutoUnlock cancelationScope(mLock);
// Yield, and try again
PR_Sleep(PR_INTERVAL_NO_WAIT);
continue;
}
// Set error state
mState = ERROR;
// Drop our mutex - NotifyError doesn't want it held
mutex.unlock();
// Notify
sqlite3 *db = sqlite3_db_handle(aStatement);
(void)NotifyError(rc, sqlite3_errmsg(db));
// And stop processing statements
return PR_FALSE;
}
// If we do not have a callback, there's no point in executing this
// statement anymore, but we wish to continue to execute statements. We
// also need to update our state if we are finished, so break out of the
// while loop.
if (!mCallback)
break;
// If we have been canceled, there is no point in going on...
if (mCancelRequested) {
mState = CANCELED;
return PR_FALSE;
}
// Build our results and notify if it's time.
rv = BuildAndNotifyResults(aStatement);
if (NS_FAILED(rv))
break;
}
// If we have an error that we have not already notified about, set our
// state accordingly, and notify.
if (NS_FAILED(rv)) {
mState = ERROR;
// Drop our mutex - NotifyError doesn't want it held
mutex.unlock();
// Notify, and stop processing statements.
(void)NotifyError(mozIStorageError::ERROR, "");
return PR_FALSE;
}
// If we are done, we need to set our state accordingly while we still
// hold our lock. We would have already returned if we were canceled or had
// an error at this point.
if (aFinished)
mState = COMPLETED;
return PR_TRUE;
}
/**
* Builds a result set up with a row from a given statement. If we meet the
* right criteria, go ahead and notify about this results too.
*
* @pre mLock is held
*
* @param aStatement
* The statement to get the row data from.
*/
nsresult BuildAndNotifyResults(sqlite3_stmt *aStatement)
{
NS_ASSERTION(mCallback, "Trying to dispatch results without a callback!");
// At this point, it is safe to not hold the lock and allow for cancelation.
// We may add an event to the calling thread, but that thread will not end
// up running when it checks back with us to see if it should run.
nsAutoUnlock cancelationScope(mLock);
// Build result object if we need it.
if (!mResultSet)
mResultSet = new mozStorageResultSet();
NS_ENSURE_TRUE(mResultSet, NS_ERROR_OUT_OF_MEMORY);
nsRefPtr<mozStorageRow> row(new mozStorageRow());
NS_ENSURE_TRUE(row, NS_ERROR_OUT_OF_MEMORY);
nsresult rv = row->initialize(aStatement);
NS_ENSURE_SUCCESS(rv, rv);
rv = mResultSet->add(row);
NS_ENSURE_SUCCESS(rv, rv);
// If we have hit our maximum number of allowed results, or if we have hit
// the maximum amount of time we want to wait for results, notify the
// calling thread about it.
PRIntervalTime now = PR_IntervalNow();
PRIntervalTime delta = now - mIntervalStart;
if (mResultSet->rows() >= MAX_ROWS_PER_RESULT || delta > mMaxIntervalWait) {
// Notify the caller
rv = NotifyResults();
if (NS_FAILED(rv))
return NS_OK; // we'll try again with the next result
// Reset our start time
mIntervalStart = now;
}
return NS_OK;
}
/** /**
* Notifies callback about completion, and does any necessary cleanup. * Notifies callback about completion, and does any necessary cleanup.
*
* @pre mLock is not held
*/ */
nsresult NotifyComplete() nsresult NotifyComplete()
{ {
@ -460,6 +536,8 @@ private:
/** /**
* Notifies callback about an error. * Notifies callback about an error.
* *
* @pre mLock is not held
*
* @param aErrorCode * @param aErrorCode
* The error code defined in mozIStorageError for the error. * The error code defined in mozIStorageError for the error.
* @param aMessage * @param aMessage
@ -484,18 +562,20 @@ private:
/** /**
* Notifies the callback about a result set. * Notifies the callback about a result set.
* *
* @param aResultSet * @pre mLock is not held
* The mozIStorageResultSet to notify the callback about.
*/ */
nsresult NotifyResults(mozStorageResultSet *aResultSet) nsresult NotifyResults()
{ {
NS_ASSERTION(mCallback, "NotifyResults called without a callback!"); NS_ASSERTION(mCallback, "NotifyResults called without a callback!");
nsRefPtr<CallbackResultNotifier> notifier = nsRefPtr<CallbackResultNotifier> notifier =
new CallbackResultNotifier(mCallback, aResultSet, this); new CallbackResultNotifier(mCallback, mResultSet, this);
NS_ENSURE_TRUE(notifier, NS_ERROR_OUT_OF_MEMORY); NS_ENSURE_TRUE(notifier, NS_ERROR_OUT_OF_MEMORY);
return mCallingThread->Dispatch(notifier, NS_DISPATCH_NORMAL); nsresult rv = mCallingThread->Dispatch(notifier, NS_DISPATCH_NORMAL);
if (NS_SUCCEEDED(rv))
mResultSet = nsnull; // we no longer own it on success
return rv;
}; };
nsTArray<sqlite3_stmt *> mStatements; nsTArray<sqlite3_stmt *> mStatements;
@ -503,6 +583,18 @@ private:
mozStorageTransaction *mTransactionManager; mozStorageTransaction *mTransactionManager;
mozIStorageStatementCallback *mCallback; mozIStorageStatementCallback *mCallback;
nsCOMPtr<nsIThread> mCallingThread; nsCOMPtr<nsIThread> mCallingThread;
nsRefPtr<mozStorageResultSet> mResultSet;
/**
* The maximum amount of time we want to wait between results. Defined by
* MAX_MILLISECONDS_BETWEEN_RESULTS and set at construction.
*/
const PRIntervalTime mMaxIntervalWait;
/**
* The start time since our last set of results.
*/
PRIntervalTime mIntervalStart;
/** /**
* Indicates the state the object is currently in. * Indicates the state the object is currently in.

View File

@ -58,6 +58,11 @@ public:
*/ */
nsresult add(mozIStorageRow *aTuple); nsresult add(mozIStorageRow *aTuple);
/**
* @returns the number of rows this result set holds.
*/
PRInt32 rows() const { return mData.Count(); }
private: private:
/** /**
* Stores the current index of the active result set. * Stores the current index of the active result set.