Bug 1401990 - Use max_request_bytes instead of max_post_bytes if sync server provides both r=markh

MozReview-Commit-ID: GJadUIuSGKt

--HG--
extra : rebase_source : be8da0b14c0e1522bc1dc025871fc34830d9ae58
This commit is contained in:
Thom Chiovoloni 2017-10-02 16:42:22 -04:00
parent b1236d3f9d
commit e6050e53aa
2 changed files with 618 additions and 212 deletions

View File

@ -790,45 +790,96 @@ Collection.prototype = {
}
return Resource.prototype.post.call(this, data);
}
let getConfig = (name, defaultVal) => {
// serverConfiguration is allowed to be missing during tests.
if (this._service.serverConfiguration && this._service.serverConfiguration.hasOwnProperty(name)) {
return this._service.serverConfiguration[name];
}
return defaultVal;
};
// On a server that does not support the batch API, we expect the /info/configuration
// endpoint to provide "max_record_payload_bytes" and "max_request_bytes" and limits.
// On a server that supports the batching API, we expect "max_record_payload_bytes"
// (as before), as well as "max_post_bytes", "max_post_records", "max_total_bytes" and
// "max_total_records". Much of the complexity here and in enqueue is attempting to
// handle both these cases simultaneously.
let config = {
// Note that from the server's POV, max_post_bytes is the sum of payload
// lengths, but we treat it equivalently to max_request_bytes (which is
// payload + metadata lengths).
max_post_bytes: getConfig("max_post_bytes",
getConfig("max_request_bytes", 260 * 1024)),
max_post_records: getConfig("max_post_records", Infinity),
max_batch_bytes: getConfig("max_total_bytes", Infinity),
max_batch_records: getConfig("max_total_records", Infinity),
max_record_payload_bytes: getConfig("max_record_payload_bytes", 256 * 1024),
};
if (config.max_post_bytes <= config.max_record_payload_bytes) {
this._log.warn("Server configuration max_post_bytes is too low for max_record_payload_bytes", config);
// Assume 4k of extra is enough. See also getMaxRecordPayloadSize in service.js
config.max_record_payload_bytes = config.max_post_bytes - 4096;
}
this._log.trace("new PostQueue created with config", config);
return new PostQueue(poster, timestamp, config, log, postCallback);
return new PostQueue(poster, timestamp,
this._service.serverConfiguration || {}, log, postCallback);
},
};
// These are limits for requests provided by the server at the
// info/configuration endpoint -- server documentation is available here:
// http://moz-services-docs.readthedocs.io/en/latest/storage/apis-1.5.html#api-instructions
//
// All are optional, however we synthesize (non-infinite) default values for the
// "max_request_bytes" and "max_record_payload_bytes" options. For the others,
// we ignore them (we treat the limit is infinite) if they're missing.
//
// These are also the only ones that all servers (even batching-disabled
// servers) should support, at least once this sync-serverstorage patch is
// everywhere https://github.com/mozilla-services/server-syncstorage/pull/74
//
// Batching enabled servers also limit the amount of payload data and number
// of and records we can send in a single post as well as in the whole batch.
// Note that the byte limits for these there are just with respect to the
// *payload* data, e.g. the data appearing in the payload property (a
// string) of the object.
//
// Note that in practice, these limits should be sensible, but the code makes
// no assumptions about this. If we hit any of the limits, we perform the
// corresponding action (e.g. submit a request, possibly committing the
// current batch).
const DefaultPostQueueConfig = Object.freeze({
// Number of total bytes allowed in a request
max_request_bytes: 260 * 1024,
// Maximum number of bytes allowed in the "payload" property of a record.
max_record_payload_bytes: 256 * 1024,
// The limit for how many bytes worth of data appearing in "payload"
// properties are allowed in a single post.
max_post_bytes: Infinity,
// The limit for the number of records allowed in a single post.
max_post_records: Infinity,
// The limit for how many bytes worth of data appearing in "payload"
// properties are allowed in a batch. (Same as max_post_bytes, but for
// batches).
max_total_bytes: Infinity,
// The limit for the number of records allowed in a single post. (Same
// as max_post_records, but for batches).
max_total_records: Infinity,
});
// Manages a pair of (byte, count) limits for a PostQueue, such as
// (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records).
class LimitTracker {
constructor(maxBytes, maxRecords) {
this.maxBytes = maxBytes;
this.maxRecords = maxRecords;
this.curBytes = 0;
this.curRecords = 0;
}
clear() {
this.curBytes = 0;
this.curRecords = 0;
}
canAddRecord(payloadSize) {
// The record counts are inclusive, but depending on the version of the
// server, the byte counts may or may not be inclusive (See
// https://github.com/mozilla-services/server-syncstorage/issues/73).
return this.curRecords + 1 <= this.maxRecords &&
this.curBytes + payloadSize < this.maxBytes;
}
canNeverAdd(recordSize) {
return recordSize >= this.maxBytes;
}
didAddRecord(recordSize) {
if (!this.canAddRecord(recordSize)) {
// This is a bug, caller is expected to call canAddRecord first.
throw new Error("LimitTracker.canAddRecord must be checked before adding record");
}
this.curRecords += 1;
this.curBytes += recordSize;
}
}
/* A helper to manage the posting of records while respecting the various
size limits.
@ -843,14 +894,22 @@ Collection.prototype = {
In most cases we expect there to be exactly 1 batch consisting of possibly
multiple POSTs.
*/
function PostQueue(poster, timestamp, config, log, postCallback) {
function PostQueue(poster, timestamp, serverConfig, log, postCallback) {
// The "post" function we should use when it comes time to do the post.
this.poster = poster;
this.log = log;
// The config we use. We expect it to have fields "max_post_records",
// "max_batch_records", "max_post_bytes", and "max_batch_bytes"
this.config = config;
let config = Object.assign({}, DefaultPostQueueConfig, serverConfig);
if (!serverConfig.max_request_bytes && serverConfig.max_post_bytes) {
// Use max_post_bytes for max_request_bytes if it's missing. Only needed
// until server-syncstorage/pull/74 is everywhere, and even then it's
// unnecessary if the server limits are configured sanely (there's no
// guarantee of -- at least before that is fully deployed)
config.max_request_bytes = serverConfig.max_post_bytes;
}
this.log.trace("new PostQueue config (after defaults): ", config);
// The callback we make with the response when we do get around to making the
// post (which could be during any of the enqueue() calls or the final flush())
@ -861,19 +920,25 @@ function PostQueue(poster, timestamp, config, log, postCallback) {
// complete, or it's a post to a server that does not understand batching.
this.postCallback = postCallback;
// Tracks the count and combined payload size for the records we've queued
// so far but are yet to POST.
this.postLimits = new LimitTracker(config.max_post_bytes, config.max_post_records);
// As above, but for the batch size.
this.batchLimits = new LimitTracker(config.max_total_bytes, config.max_total_records);
// Limit for the size of `this.queued` before we do a post.
this.maxRequestBytes = config.max_request_bytes;
// Limit for the size of incoming record payloads.
this.maxPayloadBytes = config.max_record_payload_bytes;
// The string where we are capturing the stringified version of the records
// queued so far. It will always be invalid JSON as it is always missing the
// closing bracket.
// closing bracket. It's also used to track whether or not we've gone past
// maxRequestBytes.
this.queued = "";
// The number of records we've queued so far but are yet to POST.
this.numQueued = 0;
// The number of records/bytes we've processed in previous POSTs for our
// current batch. Does *not* include records currently queued for the next POST.
this.numAlreadyBatched = 0;
this.bytesAlreadyBatched = 0;
// The ID of our current batch. Can be undefined (meaning we are yet to make
// the first post of a patch, so don't know if we have a batch), null (meaning
// we've made the first post but the server response indicated no batching
@ -897,43 +962,44 @@ PostQueue.prototype = {
let bytes = JSON.stringify(jsonRepr);
// Tests sometimes return objects without payloads, and we just use the
// byte length for those cases.
let payloadLength = jsonRepr.payload ? jsonRepr.payload.length : bytes.length;
if (payloadLength > this.config.max_record_payload_bytes) {
return { enqueued: false, error: new Error("Single record too large to submit to server") };
}
// We use the payload size for the LimitTrackers, since that's what the
// byte limits other than max_request_bytes refer to.
let payloadLength = jsonRepr.payload.length;
// The `+ 2` is to account for the 2-byte (maximum) overhead (one byte for
// the leading comma or "[", which all records will have, and the other for
// the final trailing "]", only present for the last record).
let newLength = this.queued.length + bytes.length + 2;
let newRecordCount = this.numQueued + 1;
let encodedLength = bytes.length + 2;
// Note that the max_post_records and max_batch_records server limits are
// inclusive (e.g. if the max_post_records == 100, it will allow a post with
// 100 records), but the byte limits are not. (See
// https://github.com/mozilla-services/server-syncstorage/issues/73)
// Check first if there's some limit that indicates we cannot ever enqueue
// this record.
let isTooBig = this.postLimits.canNeverAdd(payloadLength) ||
this.batchLimits.canNeverAdd(payloadLength) ||
encodedLength >= this.maxRequestBytes ||
payloadLength >= this.maxPayloadBytes;
// Have we exceeeded the maximum size or record count for a single POST?
let postSizeExceeded = newRecordCount > this.config.max_post_records ||
newLength >= this.config.max_post_bytes;
// Have we exceeded the maximum size or record count for the entire batch?
let batchSizeExceeded = (newRecordCount + this.numAlreadyBatched) > this.config.max_batch_records ||
(newLength + this.bytesAlreadyBatched) >= this.config.max_batch_bytes;
if (postSizeExceeded || batchSizeExceeded) {
this.log.trace("PostQueue flushing due to ", { postSizeExceeded, batchSizeExceeded });
// We need to write the queue out before handling this one, but we only
// commit the batch (and thus start a new one) if the batch is full.
await this.flush(batchSizeExceeded);
if (isTooBig) {
return { enqueued: false, error: new Error("Single record too large to submit to server") };
}
let canPostRecord = this.postLimits.canAddRecord(payloadLength);
let canBatchRecord = this.batchLimits.canAddRecord(payloadLength);
let canSendRecord = this.queued.length + encodedLength < this.maxRequestBytes;
if (!canPostRecord || !canBatchRecord || !canSendRecord) {
this.log.trace("PostQueue flushing: ", {canPostRecord, canSendRecord, canBatchRecord});
// We need to write the queue out before handling this one, but we only
// commit the batch (and thus start a new one) if the record couldn't fit
// inside the batch.
await this.flush(!canBatchRecord);
}
this.postLimits.didAddRecord(payloadLength);
this.batchLimits.didAddRecord(payloadLength);
// Either a ',' or a '[' depending on whether this is the first record.
this.queued += this.numQueued ? "," : "[";
this.queued += this.queued.length ? "," : "[";
this.queued += bytes;
this.numQueued++;
return { enqueued: true };
},
@ -962,17 +1028,14 @@ PostQueue.prototype = {
headers.push(["x-if-unmodified-since", this.lastModified]);
this.log.info(`Posting ${this.numQueued} records of ${this.queued.length + 1} bytes with batch=${batch}`);
let numQueued = this.postLimits.curRecords;
this.log.info(`Posting ${numQueued} records of ${this.queued.length + 1} bytes with batch=${batch}`);
let queued = this.queued + "]";
if (finalBatchPost) {
this.bytesAlreadyBatched = 0;
this.numAlreadyBatched = 0;
} else {
this.bytesAlreadyBatched += queued.length;
this.numAlreadyBatched += this.numQueued;
this.batchLimits.clear();
}
this.postLimits.clear();
this.queued = "";
this.numQueued = 0;
let response = await this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null));
if (!response.success) {

View File

@ -6,23 +6,121 @@ let { PostQueue } = Cu.import("resource://services-sync/record.js", {});
initTestLogging("Trace");
function makeRecord(nbytes) {
// make a string 2-bytes less - the added quotes will make it correct.
return {
toJSON: () => "x".repeat(nbytes - 2),
toJSON: () => ({ payload: "x".repeat(nbytes) }),
}
}
// Note: This is 14 bytes. Tests make assumptions about this (even if it's just
// in setting config.max_request_bytes to a specific value).
makeRecord.nonPayloadOverhead = JSON.stringify(makeRecord(0).toJSON()).length;
// Gives how many encoded bytes a request with the given payload
// sizes will be (assuming the records were created by makeRecord)
// requestBytesFor([20]) => 22, requestBytesFor([20, 20]) => 43
function requestBytesFor(recordPayloadByteCounts) {
let requestBytes = 1;
for (let size of recordPayloadByteCounts) {
requestBytes += size + 1 + makeRecord.nonPayloadOverhead;
}
return requestBytes;
}
function makePostQueue(config, lastModTime, responseGenerator) {
let stats = {
posts: [],
}
batches: [],
};
let poster = (data, headers, batch, commit) => {
let thisPost = { nbytes: data.length, batch, commit };
let payloadBytes = 0;
let numRecords = 0;
for (let record of JSON.parse(data)) {
if (config.max_record_payload_bytes) {
less(record.payload.length, config.max_record_payload_bytes,
"PostQueue should respect max_record_payload_bytes");
}
payloadBytes += record.payload.length;
++numRecords;
}
let thisPost = {
nbytes: data.length,
batch,
commit,
payloadBytes,
numRecords
};
if (headers.length) {
thisPost.headers = headers;
}
// check that we respected the provided limits for the post
if (config.max_post_records) {
lessOrEqual(numRecords, config.max_post_records, "PostQueue should respect max_post_records");
}
if (config.max_post_bytes) {
less(payloadBytes, config.max_post_bytes, "PostQueue should respect max_post_bytes");
}
if (config.max_request_bytes) {
less(thisPost.nbytes, config.max_request_bytes, "PostQueue should respect max_request_bytes");
}
stats.posts.push(thisPost);
return Promise.resolve(responseGenerator.next().value);
// Call this now so we can check if there's a batch id in it.
// Kind of cludgey, but allows us to have the correct batch id even
// before the next post is made.
let nextResponse = responseGenerator.next().value;
// Record info for the batch.
let curBatch = stats.batches[stats.batches.length - 1];
// If there's no batch, it committed, or we requested a new one,
// then we need to start a new one.
if (!curBatch || batch == "true" || curBatch.didCommit) {
curBatch = {
posts: 0,
payloadBytes: 0,
numRecords: 0,
didCommit: false,
batch,
serverBatch: false
};
if (nextResponse.obj && nextResponse.obj.batch) {
curBatch.batch = nextResponse.obj.batch;
curBatch.serverBatch = true;
}
stats.batches.push(curBatch);
}
// If we provided a batch id, it must be the same as the current batch
if (batch && batch != "true") {
equal(curBatch.batch, batch);
}
curBatch.posts += 1;
curBatch.payloadBytes += payloadBytes;
curBatch.numRecords += numRecords;
curBatch.didCommit = commit;
// if this is an actual server batch (or it's a one-shot batch), check that
// we respected the provided total limits
if (commit && (batch == "true" || curBatch.serverBatch)) {
if (config.max_total_records) {
lessOrEqual(curBatch.numRecords,
config.max_total_records, "PostQueue should respect max_total_records");
}
if (config.max_total_bytes) {
less(curBatch.payloadBytes,
config.max_total_bytes, "PostQueue should respect max_total_bytes");
}
}
return Promise.resolve(nextResponse);
}
let done = () => {}
@ -32,10 +130,7 @@ function makePostQueue(config, lastModTime, responseGenerator) {
add_task(async function test_simple() {
let config = {
max_post_bytes: 1000,
max_post_records: 100,
max_batch_bytes: Infinity,
max_batch_records: Infinity,
max_request_bytes: 1000,
max_record_payload_bytes: 1000,
}
@ -50,20 +145,28 @@ add_task(async function test_simple() {
await pq.flush(true);
deepEqual(stats.posts, [{
nbytes: 12, // expect our 10 byte record plus "[]" to wrap it.
nbytes: requestBytesFor([10]),
payloadBytes: 10,
numRecords: 1,
commit: true, // we don't know if we have batch semantics, so committed.
headers: [["x-if-unmodified-since", time]],
batch: "true"}]);
batch: "true"
}]);
deepEqual(stats.batches, [{
posts: 1,
payloadBytes: 10,
numRecords: 1,
didCommit: true,
batch: "true",
serverBatch: false
}])
});
// Test we do the right thing when we need to make multiple posts when there
// are no batch semantics
add_task(async function test_max_post_bytes_no_batch() {
add_task(async function test_max_request_bytes_no_batch() {
let config = {
max_post_bytes: 50,
max_post_records: Infinity,
max_batch_bytes: Infinity,
max_batch_records: Infinity,
max_request_bytes: 50,
max_record_payload_bytes: 50,
}
@ -74,33 +177,35 @@ add_task(async function test_max_post_bytes_no_batch() {
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
await pq.enqueue(makeRecord(20)); // total size now 22 bytes - "[" + record + "]"
await pq.enqueue(makeRecord(20)); // total size now 43 bytes - "[" + record + "," + record + "]"
await pq.enqueue(makeRecord(20)); // this will exceed our byte limit, so will be in the 2nd POST.
let payloadSize = 20 - makeRecord.nonPayloadOverhead;
await pq.enqueue(makeRecord(payloadSize)); // total size now 22 bytes - "[" + record + "]"
await pq.enqueue(makeRecord(payloadSize)); // total size now 43 bytes - "[" + record + "," + record + "]"
await pq.enqueue(makeRecord(payloadSize)); // this will exceed our byte limit, so will be in the 2nd POST.
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 43, // 43 for the first post
nbytes: 43, // 43 for the first part
payloadBytes: payloadSize * 2,
numRecords: 2,
commit: false,
headers: [["x-if-unmodified-since", time]],
batch: "true",
}, {
nbytes: 22,
payloadBytes: payloadSize,
numRecords: 1,
commit: false, // we know we aren't in a batch, so never commit.
headers: [["x-if-unmodified-since", time + 100]],
batch: null,
}
]);
equal(stats.batches.filter(x => x.didCommit).length, 0)
equal(pq.lastModified, time + 200);
});
add_task(async function test_max_record_payload_bytes_no_batch() {
let config = {
max_post_bytes: 100,
max_post_records: Infinity,
max_batch_bytes: Infinity,
max_batch_records: Infinity,
max_request_bytes: 100,
max_record_payload_bytes: 50,
}
@ -111,31 +216,49 @@ add_task(async function test_max_record_payload_bytes_no_batch() {
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
await pq.enqueue(makeRecord(50)); // total size now 52 bytes - "[" + record + "]"
await pq.enqueue(makeRecord(46)); // total size now 99 bytes - "[" + record0 + "," + record1 + "]"
// Should trigger when the record really is too large to fit
let {enqueued} = await pq.enqueue(makeRecord(51));
ok(!enqueued)
// Shouldn't trigger when the encoded record is too big
ok((await pq.enqueue(makeRecord(50 - makeRecord.nonPayloadOverhead))).enqueued); // total size now 52 bytes - "[" + record + "]"
ok((await pq.enqueue(makeRecord(46 - makeRecord.nonPayloadOverhead))).enqueued); // total size now 99 bytes - "[" + record0 + "," + record1 + "]"
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 99,
payloadBytes: 50 + 46 - makeRecord.nonPayloadOverhead * 2,
numRecords: 2,
commit: true, // we know we aren't in a batch, so never commit.
batch: "true",
headers: [["x-if-unmodified-since", time]],
}
]);
deepEqual(stats.batches, [
{
posts: 1,
payloadBytes: 50 + 46 - makeRecord.nonPayloadOverhead * 2,
numRecords: 2,
didCommit: true,
batch: "true",
serverBatch: false
}
]);
equal(pq.lastModified, time + 100);
});
// Batch tests.
// Test making a single post when batch semantics are in place.
add_task(async function test_single_batch() {
let config = {
max_post_bytes: 1000,
max_post_records: 100,
max_batch_bytes: 2000,
max_batch_records: 200,
max_total_records: 200,
max_record_payload_bytes: 1000,
}
const time = 11111111;
@ -151,23 +274,35 @@ add_task(async function test_single_batch() {
deepEqual(stats.posts, [
{
nbytes: 12, // expect our 10 byte record plus "[]" to wrap it.
nbytes: requestBytesFor([10]),
numRecords: 1,
payloadBytes: 10,
commit: true, // we don't know if we have batch semantics, so committed.
batch: "true",
headers: [["x-if-unmodified-since", time]],
}
]);
deepEqual(stats.batches, [{
posts: 1,
payloadBytes: 10,
numRecords: 1,
didCommit: true,
batch: 1234,
serverBatch: true
}]);
});
// Test we do the right thing when we need to make multiple posts when there
// are batch semantics in place.
// Test we do the right thing when we need to make multiple posts due to
// max_post_bytes when there are batch semantics in place.
add_task(async function test_max_post_bytes_batch() {
let config = {
max_post_bytes: 50,
max_post_records: 4,
max_batch_bytes: 5000,
max_batch_records: 100,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 50,
max_request_bytes: 4000,
}
const time = 11111111;
@ -181,36 +316,112 @@ add_task(async function test_max_post_bytes_batch() {
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(20))).enqueued); // total size now 22 bytes - "[" + record + "]"
ok((await pq.enqueue(makeRecord(20))).enqueued); // total size now 43 bytes - "[" + record + "," + record + "]"
ok((await pq.enqueue(makeRecord(20))).enqueued); // this will exceed our byte limit, so will be in the 2nd POST.
ok((await pq.enqueue(makeRecord(20))).enqueued); // 20
ok((await pq.enqueue(makeRecord(20))).enqueued); // 40
// 60 would overflow, so post
ok((await pq.enqueue(makeRecord(20))).enqueued); // 20
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 43, // 43 for the first post
nbytes: requestBytesFor([20, 20]),
payloadBytes: 40,
numRecords: 2,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
}, {
nbytes: 22,
nbytes: requestBytesFor([20]),
payloadBytes: 20,
numRecords: 1,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
}
]);
deepEqual(stats.batches, [{
posts: 2,
payloadBytes: 60,
numRecords: 3,
didCommit: true,
batch: 1234,
serverBatch: true
}]);
equal(pq.lastModified, time + 200);
});
// Test we do the right thing when we need to make multiple posts due to
// max_request_bytes when there are batch semantics in place.
add_task(async function test_max_request_bytes_batch() {
let config = {
max_post_bytes: 60,
max_post_records: 40,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 500,
max_request_bytes: 100,
};
const time = 11111111;
function* responseGenerator() {
yield { success: true, status: 202, obj: { batch: 1234 },
headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 },
};
yield { success: true, status: 202, obj: { batch: 1234 },
headers: { "x-last-modified": time + 200, "x-weave-timestamp": time + 200 },
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 10, request: 26 (10 + 14 + 2)
ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 20, request: 51 (10 + 14 + 1) * 2 + 1
ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 30, request: 76 (10 + 14 + 1) * 3 + 1
// 1 more would be post: 40 (fine), request: 101, So we should post.
ok((await pq.enqueue(makeRecord(10))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([10, 10, 10]),
payloadBytes: 30,
numRecords: 3,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
}, {
nbytes: requestBytesFor([10]),
payloadBytes: 10,
numRecords: 1,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
}
]);
deepEqual(stats.batches, [{
posts: 2,
payloadBytes: 40,
numRecords: 4,
didCommit: true,
batch: 1234,
serverBatch: true
}]);
equal(pq.lastModified, time + 200);
});
// Test we do the right thing when the batch bytes limit is exceeded.
add_task(async function test_max_batch_bytes_batch() {
add_task(async function test_max_total_bytes_batch() {
let config = {
max_post_bytes: 50,
max_post_records: 20,
max_batch_bytes: 70,
max_batch_records: 100,
max_total_bytes: 70,
max_total_records: 100,
max_record_payload_bytes: 50,
max_request_bytes: 500,
}
const time0 = 11111111;
@ -231,100 +442,85 @@ add_task(async function test_max_batch_bytes_batch() {
}
let { pq, stats } = makePostQueue(config, time0, responseGenerator());
ok((await pq.enqueue(makeRecord(20))).enqueued); // total size now 22 bytes - "[" + record + "]"
ok((await pq.enqueue(makeRecord(20))).enqueued); // total size now 43 bytes - "[" + record + "," + record + "]"
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 20
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 40, batch: 40
// this will exceed our POST byte limit, so will be in the 2nd POST - but still in the first batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // 22 bytes for 2nd post, 55 bytes in the batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 60
// this will exceed our batch byte limit, so will be in a new batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // 22 bytes in 3rd post/2nd batch
ok((await pq.enqueue(makeRecord(20))).enqueued); // 43 bytes in 3rd post/2nd batch
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 20
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 40, batch: 40
// This will exceed POST byte limit, so will be in the 4th post, part of the 2nd batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // 22 bytes for 4th post/2nd batch
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 60
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 43, // 43 for the first post
nbytes: requestBytesFor([20, 20]),
payloadBytes: 40,
numRecords: 2,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time0]],
}, {
// second post of 22 bytes in the first batch, committing it.
nbytes: 22,
nbytes: requestBytesFor([20]),
payloadBytes: 20,
numRecords: 1,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time0]],
}, {
// 3rd post of 43 bytes in a new batch, not yet committing it.
nbytes: 43,
nbytes: requestBytesFor([20, 20]),
payloadBytes: 40,
numRecords: 2,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time1]],
}, {
// 4th post of 22 bytes in second batch, committing it.
nbytes: 22,
nbytes: requestBytesFor([20]),
payloadBytes: 20,
numRecords: 1,
commit: true,
batch: 5678,
headers: [["x-if-unmodified-since", time1]],
},
]);
deepEqual(stats.batches, [
{
posts: 2,
payloadBytes: 60,
numRecords: 3,
didCommit: true,
batch: 1234,
serverBatch: true
},
{
posts: 2,
payloadBytes: 60,
numRecords: 3,
didCommit: true,
batch: 5678,
serverBatch: true
}
]);
equal(pq.lastModified, time1 + 200);
});
// Test we split up the posts when we exceed the record limit when batch semantics
// are in place.
add_task(async function test_max_post_bytes_batch() {
add_task(async function test_max_post_records_batch() {
let config = {
max_post_bytes: 1000,
max_post_records: 2,
max_batch_bytes: 5000,
max_batch_records: 100,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 1000,
}
const time = 11111111;
function* responseGenerator() {
yield { success: true, status: 202, obj: { batch: 1234 },
headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 },
};
yield { success: true, status: 202, obj: { batch: 1234 },
headers: { "x-last-modified": time + 200, "x-weave-timestamp": time + 200 },
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(20))).enqueued); // total size now 22 bytes - "[" + record + "]"
ok((await pq.enqueue(makeRecord(20))).enqueued); // total size now 43 bytes - "[" + record + "," + record + "]"
ok((await pq.enqueue(makeRecord(20))).enqueued); // will exceed record limit, so will be in 2nd post.
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 43, // 43 for the first post
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
}, {
nbytes: 22,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
}
]);
equal(pq.lastModified, time + 200);
});
// Test that a single huge record fails to enqueue
add_task(async function test_huge_record() {
let config = {
max_post_bytes: 50,
max_post_records: 100,
max_batch_bytes: 5000,
max_batch_records: 100,
max_record_payload_bytes: 50,
}
max_request_bytes: 1000,
};
const time = 11111111;
function* responseGenerator() {
@ -338,32 +534,40 @@ add_task(async function test_huge_record() {
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(20))).enqueued);
let { enqueued, error } = await pq.enqueue(makeRecord(1000));
ok(!enqueued);
notEqual(error, undefined);
// make sure that we keep working, skipping the bad record entirely
// (handling the error the queue reported is left up to caller)
ok((await pq.enqueue(makeRecord(20))).enqueued);
// will exceed record limit of 2, so will be in 2nd post.
ok((await pq.enqueue(makeRecord(20))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 43, // 43 for the first post
nbytes: requestBytesFor([20, 20]),
numRecords: 2,
payloadBytes: 40,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
}, {
nbytes: 22,
nbytes: requestBytesFor([20]),
numRecords: 1,
payloadBytes: 20,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
}
]);
deepEqual(stats.batches, [{
posts: 2,
payloadBytes: 60,
numRecords: 3,
batch: 1234,
serverBatch: true,
didCommit: true,
}]);
equal(pq.lastModified, time + 200);
});
@ -372,9 +576,10 @@ add_task(async function test_max_records_batch() {
let config = {
max_post_bytes: 1000,
max_post_records: 3,
max_batch_bytes: 10000,
max_batch_records: 5,
max_total_bytes: 10000,
max_total_records: 5,
max_record_payload_bytes: 1000,
max_request_bytes: 10000,
}
const time0 = 11111111;
@ -413,40 +618,68 @@ add_task(async function test_max_records_batch() {
deepEqual(stats.posts, [
{ // 3 records
nbytes: 64,
nbytes: requestBytesFor([20, 20, 20]),
payloadBytes: 60,
numRecords: 3,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time0]],
}, { // 2 records -- end batch1
nbytes: 43,
nbytes: requestBytesFor([20, 20]),
payloadBytes: 40,
numRecords: 2,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time0]],
}, { // 3 records
nbytes: 64,
nbytes: requestBytesFor([20, 20, 20]),
payloadBytes: 60,
numRecords: 3,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time1]],
}, { // 1 record -- end batch2
nbytes: 22,
nbytes: requestBytesFor([20]),
payloadBytes: 20,
numRecords: 1,
commit: true,
batch: 5678,
headers: [["x-if-unmodified-since", time1]],
},
]);
deepEqual(stats.batches, [{
posts: 2,
payloadBytes: 100,
numRecords: 5,
batch: 1234,
serverBatch: true,
didCommit: true,
}, {
posts: 2,
payloadBytes: 80,
numRecords: 4,
batch: 5678,
serverBatch: true,
didCommit: true,
}]);
equal(pq.lastModified, time1 + 200);
});
// Test we do the right thing when the batch byte limit is met but not exceeded.
// Test we do the right thing when the limits are met but not exceeded.
add_task(async function test_packed_batch() {
let config = {
max_post_bytes: 54,
max_post_bytes: 41,
max_post_records: 4,
max_batch_bytes: 107,
max_batch_records: 100,
max_record_payload_bytes: 25,
}
max_total_bytes: 81,
max_total_records: 8,
max_record_payload_bytes: 20 + makeRecord.nonPayloadOverhead + 1,
max_request_bytes: requestBytesFor([10, 10, 10, 10]) + 1,
};
const time = 11111111;
function* responseGenerator() {
@ -459,25 +692,135 @@ add_task(async function test_packed_batch() {
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(25))).enqueued); // total size now 27 bytes - "[" + record + "]"
ok((await pq.enqueue(makeRecord(25))).enqueued); // total size now 53 bytes - "[" + record + "," + record + "]"
ok((await pq.enqueue(makeRecord(25))).enqueued); // this will exceed our byte limit, so will be in the 2nd POST.
ok((await pq.enqueue(makeRecord(25))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 53,
nbytes: requestBytesFor([10, 10, 10, 10]),
numRecords: 4,
payloadBytes: 40,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
}, {
nbytes: 53,
nbytes: requestBytesFor([10, 10, 10, 10]),
numRecords: 4,
payloadBytes: 40,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
}
]);
deepEqual(stats.batches, [{
posts: 2,
payloadBytes: 80,
numRecords: 8,
batch: 1234,
serverBatch: true,
didCommit: true,
}]);
equal(pq.lastModified, time + 200);
});
// Tests that check that a single record fails to enqueue for the provided config
async function test_enqueue_failure_case(failureLimit, config) {
const time = 11111111;
function* responseGenerator() {
yield { success: true, status: 202, obj: { batch: 1234 },
headers: { "x-last-modified": time + 100, "x-weave-timestamp": time + 100 },
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
// Check on empty postqueue
let result = await pq.enqueue(makeRecord(failureLimit + 1));
ok(!result.enqueued);
notEqual(result.error, undefined);
ok((await pq.enqueue(makeRecord(5))).enqueued);
// check on nonempty postqueue
result = await pq.enqueue(makeRecord(failureLimit + 1));
ok(!result.enqueued);
notEqual(result.error, undefined);
// make sure that we keep working, skipping the bad record entirely
// (handling the error the queue reported is left up to caller)
ok((await pq.enqueue(makeRecord(5))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([5, 5]),
numRecords: 2,
payloadBytes: 10,
commit: true,
batch: "true",
headers: [["x-if-unmodified-since", time]],
}
]);
deepEqual(stats.batches, [{
posts: 1,
payloadBytes: 10,
numRecords: 2,
batch: 1234,
serverBatch: true,
didCommit: true,
}]);
equal(pq.lastModified, time + 100);
}
add_task(async function test_max_post_bytes_enqueue_failure() {
await test_enqueue_failure_case(50, {
max_post_bytes: 50,
max_post_records: 100,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 500,
max_request_bytes: 500,
});
});
add_task(async function test_max_request_bytes_enqueue_failure() {
await test_enqueue_failure_case(50, {
max_post_bytes: 500,
max_post_records: 100,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 500,
max_request_bytes: 50,
});
});
add_task(async function test_max_record_payload_bytes_enqueue_failure() {
await test_enqueue_failure_case(50, {
max_post_bytes: 500,
max_post_records: 100,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 50,
max_request_bytes: 500,
});
});