Commit f3e896b4 authored by Alexandre Frechette's avatar Alexandre Frechette Committed by Commit Bot

Update "to-be-uploaded" actions before uploading them instead of in a failure state.

Bug: 1065431
Change-Id: Icd8d9dbb98b5408a2d3de383baa1d26c4134c81d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2124737Reviewed-by: default avatarDan H <harringtond@chromium.org>
Commit-Queue: Alexandre Frechette <frechette@chromium.org>
Cr-Commit-Position: refs/heads/master@{#756425}
parent ec3307c5
...@@ -52,7 +52,9 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest ...@@ -52,7 +52,9 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest
private final long mMaxActionUploadAttempts; private final long mMaxActionUploadAttempts;
// Total number of actions that can be uploaded in a chained batch request. // Total number of actions that can be uploaded in a chained batch request.
private final long mMaxActionsUploadsPerBatchedRequest; private final long mMaxActionsUploadsPerBatchedRequest;
// Maximum bytes that can be uploaded in a single request. // Maximum bytes of StreamUploadableAction that can be uploaded in a single request. The actual
// request is made of ActionPayloads and SemanticProperties, not StreamUploadableActions, so
// this serves as a proxy.
private final long mMaxBytesPerRequest; private final long mMaxBytesPerRequest;
private final long mMaxActionUploadTtl; private final long mMaxActionUploadTtl;
...@@ -99,19 +101,56 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest ...@@ -99,19 +101,56 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest
UploadableActionsRequestBuilder requestBuilder = UploadableActionsRequestBuilder requestBuilder =
new UploadableActionsRequestBuilder(mProtocolAdapter); new UploadableActionsRequestBuilder(mProtocolAdapter);
int actionPayloadBytes = 0; int actionPayloadBytes = 0;
Set<StreamUploadableAction> actionsToUpload = new HashSet<>(); Set<StreamUploadableAction> actionsToUpload = new HashSet<>();
Set<StreamUploadableAction> actionsRemaining = new HashSet<>();
ArrayList<String> contentIds = new ArrayList<>(); ArrayList<String> contentIds = new ArrayList<>();
UploadableActionMutation actionMutation = mStore.editUploadableActions();
// Select which actions to send in this request, and update their attempt count. Remaining
// actions are collected and sent next. Stale actions are removed from the store.
for (StreamUploadableAction action : actions) { for (StreamUploadableAction action : actions) {
if (isStale(action)) {
actionMutation.remove(action, action.getFeatureContentId());
continue;
}
int actionBytes = action.toByteArray().length; int actionBytes = action.toByteArray().length;
if (mMaxBytesPerRequest > actionPayloadBytes + actionBytes) { if (actionPayloadBytes + actionBytes < mMaxBytesPerRequest) {
actionsToUpload.add(action);
contentIds.add(action.getFeatureContentId());
actionPayloadBytes += actionBytes; actionPayloadBytes += actionBytes;
actionMutation.remove(action, action.getFeatureContentId());
StreamUploadableAction actionToUpload =
action.toBuilder()
.setUploadAttempts(action.getUploadAttempts() + 1)
.build();
actionMutation.upsert(actionToUpload, actionToUpload.getFeatureContentId());
actionsToUpload.add(actionToUpload);
contentIds.add(actionToUpload.getFeatureContentId());
} else { } else {
break; actionsRemaining.add(action);
} }
} }
CommitResult commitResult = actionMutation.commit();
if (commitResult != CommitResult.SUCCESS) {
Logger.e(TAG, "Upserting uploaded actions failed");
consumer.accept(Result.failure());
return;
}
if (actionsToUpload.isEmpty()) {
if (actionsRemaining.isEmpty()) {
// All actions were too stale to be uploaded.
consumer.accept(Result.success(token));
} else {
Logger.e(TAG, "No action fitted in the request.");
consumer.accept(Result.failure());
}
return;
}
Result<List<SemanticPropertiesWithId>> semanticPropertiesResult = Result<List<SemanticPropertiesWithId>> semanticPropertiesResult =
mStore.getSemanticProperties(contentIds); mStore.getSemanticProperties(contentIds);
List<SemanticPropertiesWithId> semanticPropertiesList = new ArrayList<>(); List<SemanticPropertiesWithId> semanticPropertiesList = new ArrayList<>();
...@@ -123,9 +162,8 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest ...@@ -123,9 +162,8 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest
Consumer<Result<ConsistencyToken>> tokenConsumer = result -> { Consumer<Result<ConsistencyToken>> tokenConsumer = result -> {
mThreadUtils.checkNotMainThread(); mThreadUtils.checkNotMainThread();
if (result.isSuccessful()) { if (result.isSuccessful()) {
actions.removeAll(actionsToUpload); if (!actionsRemaining.isEmpty()) {
if (!actions.isEmpty()) { triggerUploadActions(actionsRemaining, result.getValue(), consumer,
triggerUploadActions(actions, result.getValue(), consumer,
uploadCount + actionsToUpload.size()); uploadCount + actionsToUpload.size());
} else { } else {
consumer.accept(Result.success(result.getValue())); consumer.accept(Result.success(result.getValue()));
...@@ -188,7 +226,6 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest ...@@ -188,7 +226,6 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest
mExtensionRegistry.getExtensionRegistry()); mExtensionRegistry.getExtensionRegistry());
} catch (IOException e) { } catch (IOException e) {
Logger.e(TAG, e, "Response parse failed"); Logger.e(TAG, e, "Response parse failed");
handleUpdatingActionsOnFailure(actions);
consumer.accept(Result.failure()); consumer.accept(Result.failure());
return; return;
} }
...@@ -208,31 +245,15 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest ...@@ -208,31 +245,15 @@ public final class FeedActionUploadRequestManager implements ActionUploadRequest
} }
} else { } else {
contextResult = Result.failure(); contextResult = Result.failure();
handleUpdatingActionsOnFailure(actions);
} }
consumer.accept(contextResult); consumer.accept(contextResult);
}); });
} }
private void handleUpdatingActionsOnFailure(Set<StreamUploadableAction> actions) { private boolean isStale(StreamUploadableAction action) {
UploadableActionMutation actionMutation = mStore.editUploadableActions(); int uploadAttempts = action.getUploadAttempts();
for (StreamUploadableAction action : actions) { long currentTime = TimeUnit.MILLISECONDS.toSeconds(mClock.currentTimeMillis());
int uploadAttempts = action.getUploadAttempts(); long timeSinceUpload = currentTime - action.getTimestampSeconds();
long currentTime = TimeUnit.MILLISECONDS.toSeconds(mClock.currentTimeMillis()); return uploadAttempts >= mMaxActionUploadAttempts || timeSinceUpload > mMaxActionUploadTtl;
long timeSinceUpload = currentTime - action.getTimestampSeconds();
if (uploadAttempts < mMaxActionUploadAttempts
&& timeSinceUpload < mMaxActionUploadTtl) {
actionMutation.upsert(
action.toBuilder().setUploadAttempts(uploadAttempts + 1).build(),
action.getFeatureContentId());
} else {
actionMutation.remove(action, action.getFeatureContentId());
}
CommitResult commitResult = actionMutation.commit();
if (commitResult != CommitResult.SUCCESS) {
// TODO:log failure to the basicLoggingApi
Logger.e(TAG, "Upserting actions on failure failed");
}
}
} }
} }
...@@ -51,6 +51,7 @@ import java.nio.ByteBuffer; ...@@ -51,6 +51,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
/** Test of the {@link FeedActionUploadRequestManager} class. */ /** Test of the {@link FeedActionUploadRequestManager} class. */
...@@ -72,6 +73,8 @@ public class FeedActionUploadRequestManagerTest { ...@@ -72,6 +73,8 @@ public class FeedActionUploadRequestManagerTest {
.build(); .build();
private static final String CONTENT_ID = "contentId"; private static final String CONTENT_ID = "contentId";
private static final String CONTENT_ID_2 = "contentId2"; private static final String CONTENT_ID_2 = "contentId2";
private static final String CONTENT_ID_LONG =
"extremely-long-content-id-that-should-take-a-lot-of-bytes";
private static final byte[] SEMANTIC_PROPERTIES_BYTES = new byte[] {0x1, 0xa}; private static final byte[] SEMANTIC_PROPERTIES_BYTES = new byte[] {0x1, 0xa};
private static final SemanticProperties SEMANTIC_PROPERTIES = private static final SemanticProperties SEMANTIC_PROPERTIES =
SemanticProperties.newBuilder() SemanticProperties.newBuilder()
...@@ -176,7 +179,7 @@ public class FeedActionUploadRequestManagerTest { ...@@ -176,7 +179,7 @@ public class FeedActionUploadRequestManagerTest {
.put(ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.GET) .put(ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.GET)
.put(ConfigKey.FEED_ACTION_TTL_SECONDS, 1000L) .put(ConfigKey.FEED_ACTION_TTL_SECONDS, 1000L)
.put(ConfigKey.FEED_ACTION_SERVER_MAX_SIZE_PER_REQUEST, 20L) .put(ConfigKey.FEED_ACTION_SERVER_MAX_SIZE_PER_REQUEST, 20L)
.put(ConfigKey.FEED_ACTION_MAX_UPLOAD_ATTEMPTS, 2L) .put(ConfigKey.FEED_ACTION_MAX_UPLOAD_ATTEMPTS, 1L)
.put(ConfigKey.FEED_ACTION_SERVER_MAX_ACTIONS_PER_REQUEST, 2L) .put(ConfigKey.FEED_ACTION_SERVER_MAX_ACTIONS_PER_REQUEST, 2L)
.build(); .build();
mRequestManager = createRequestManager(configuration); mRequestManager = createRequestManager(configuration);
...@@ -247,7 +250,7 @@ public class FeedActionUploadRequestManagerTest { ...@@ -247,7 +250,7 @@ public class FeedActionUploadRequestManagerTest {
} }
@Test @Test
public void testTriggerUploadActions_batchFirstReachesMax() throws Exception { public void testTriggerUploadActions_batchFirstReachesMaxNumActions() throws Exception {
Configuration configuration = Configuration configuration =
new Configuration.Builder() new Configuration.Builder()
.put(ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.GET) .put(ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.GET)
...@@ -271,6 +274,53 @@ public class FeedActionUploadRequestManagerTest { ...@@ -271,6 +274,53 @@ public class FeedActionUploadRequestManagerTest {
assertThat(mConsumer.isCalled()).isTrue(); assertThat(mConsumer.isCalled()).isTrue();
} }
@Test
public void testTriggerUploadActions_batchFirstReachesMaxSize() throws Exception {
Configuration configuration =
new Configuration.Builder()
.put(ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.GET)
.put(ConfigKey.FEED_ACTION_TTL_SECONDS, 1000L)
.put(ConfigKey.FEED_ACTION_MAX_UPLOAD_ATTEMPTS, 2L)
.put(ConfigKey.FEED_ACTION_SERVER_MAX_SIZE_PER_REQUEST, 20L)
.put(ConfigKey.FEED_ACTION_SERVER_MAX_ACTIONS_PER_REQUEST, 1L)
.build();
mRequestManager = createRequestManager(configuration);
Set<StreamUploadableAction> actionSet = orderedSetOf(
StreamUploadableAction.newBuilder().setFeatureContentId(CONTENT_ID_LONG).build(),
StreamUploadableAction.newBuilder().setFeatureContentId(CONTENT_ID).build());
mConsumer = new RequiredConsumer<>(input -> {
mFakeThreadUtils.checkNotMainThread();
assertThat(input.isSuccessful()).isTrue();
assertThat(input.getValue().toByteArray()).isEqualTo(TOKEN_2.toByteArray());
});
mFakeNetworkClient.addResponse(createHttpResponse(/* responseCode= */ 200, RESPONSE_1));
mRequestManager.triggerUploadActions(actionSet, TOKEN_1, mConsumer);
assertThat(mConsumer.isCalled()).isTrue();
}
@Test
public void testTriggerUploadActions_batchNoUploadableActions() throws Exception {
Configuration configuration =
new Configuration.Builder()
.put(ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.GET)
.put(ConfigKey.FEED_ACTION_TTL_SECONDS, 1000L)
.put(ConfigKey.FEED_ACTION_MAX_UPLOAD_ATTEMPTS, 2L)
.put(ConfigKey.FEED_ACTION_SERVER_MAX_SIZE_PER_REQUEST, 20L)
.put(ConfigKey.FEED_ACTION_SERVER_MAX_ACTIONS_PER_REQUEST, 1L)
.build();
mRequestManager = createRequestManager(configuration);
Set<StreamUploadableAction> actionSet = setOf(
StreamUploadableAction.newBuilder().setFeatureContentId(CONTENT_ID_LONG).build());
mConsumer = new RequiredConsumer<>(input -> {
mFakeThreadUtils.checkNotMainThread();
assertThat(input.isSuccessful()).isFalse();
});
mRequestManager.triggerUploadActions(actionSet, TOKEN_1, mConsumer);
assertThat(mConsumer.isCalled()).isTrue();
}
@Test @Test
public void testTriggerUploadActions_getMethod() throws Exception { public void testTriggerUploadActions_getMethod() throws Exception {
Configuration configuration = Configuration configuration =
...@@ -321,9 +371,14 @@ public class FeedActionUploadRequestManagerTest { ...@@ -321,9 +371,14 @@ public class FeedActionUploadRequestManagerTest {
ActionRequest request = getActionRequestFromHttpRequestBody(httpRequest); ActionRequest request = getActionRequestFromHttpRequestBody(httpRequest);
UploadableActionsRequestBuilder builder = UploadableActionsRequestBuilder builder =
new UploadableActionsRequestBuilder(mFakeProtocolAdapter); new UploadableActionsRequestBuilder(mFakeProtocolAdapter);
Set<StreamUploadableAction> expectedActionSet =
setOf(StreamUploadableAction.newBuilder()
.setFeatureContentId(CONTENT_ID)
.setUploadAttempts(1)
.build());
ActionRequest expectedRequest = ActionRequest expectedRequest =
builder.setConsistencyToken(ConsistencyToken.getDefaultInstance()) builder.setConsistencyToken(ConsistencyToken.getDefaultInstance())
.setActions(actionSet) .setActions(expectedActionSet)
.build(); .build();
assertThat(request.toByteArray()).isEqualTo(expectedRequest.toByteArray()); assertThat(request.toByteArray()).isEqualTo(expectedRequest.toByteArray());
...@@ -395,4 +450,10 @@ public class FeedActionUploadRequestManagerTest { ...@@ -395,4 +450,10 @@ public class FeedActionUploadRequestManagerTest {
Collections.addAll(result, items); Collections.addAll(result, items);
return result; return result;
} }
private static <T> Set<T> orderedSetOf(T... items) {
Set<T> result = new LinkedHashSet<>();
Collections.addAll(result, items);
return result;
}
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment