Move interleaved update with local read test to the new infra

Bug: 294875831
Test: MultiProcessDataStoreIpcTest
Change-Id: I06d3f796b6806b3027124b426c54ec8e8a1febda
diff --git a/datastore/datastore-core/src/androidAndroidTest/AndroidManifest.xml b/datastore/datastore-core/src/androidAndroidTest/AndroidManifest.xml
index 584f074..3b5e13a 100644
--- a/datastore/datastore-core/src/androidAndroidTest/AndroidManifest.xml
+++ b/datastore/datastore-core/src/androidAndroidTest/AndroidManifest.xml
@@ -17,16 +17,6 @@
 <manifest xmlns:android="http://schemas.android.com/apk/res/android">
     <application>
         <service
-            android:name="androidx.datastore.core.MultiProcessDataStoreMultiProcessTest$InterleavedUpdateDataWithReadFileService"
-            android:enabled="true"
-            android:exported="false"
-            android:process=":InterleavedUpdateDataWithReadFileService" />
-        <service
-            android:name="androidx.datastore.core.MultiProcessDataStoreMultiProcessTest$InterleavedUpdateDataWithReadOkioService"
-            android:enabled="true"
-            android:exported="false"
-            android:process=":InterleavedUpdateDataWithReadOkioService" />
-        <service
             android:name="androidx.datastore.core.MultiProcessDataStoreMultiProcessTest$FailedUpdateDataFileService"
             android:enabled="true"
             android:exported="false"
diff --git a/datastore/datastore-core/src/androidTest/java/androidx/datastore/core/MultiProcessDataStoreMultiProcessTest.kt b/datastore/datastore-core/src/androidTest/java/androidx/datastore/core/MultiProcessDataStoreMultiProcessTest.kt
index ab981eb..a496d96 100644
--- a/datastore/datastore-core/src/androidTest/java/androidx/datastore/core/MultiProcessDataStoreMultiProcessTest.kt
+++ b/datastore/datastore-core/src/androidTest/java/androidx/datastore/core/MultiProcessDataStoreMultiProcessTest.kt
@@ -38,9 +38,7 @@
 import kotlinx.coroutines.Job
 import kotlinx.coroutines.async
 import kotlinx.coroutines.cancelChildren
-import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.first
-import kotlinx.coroutines.newSingleThreadContext
 import kotlinx.coroutines.runBlocking
 import kotlinx.coroutines.test.TestScope
 import kotlinx.coroutines.test.UnconfinedTestDispatcher
@@ -148,98 +146,6 @@
     }
 
     @Test
-    fun testInterleavedUpdateDataWithLocalRead_file() =
-        testInterleavedUpdateDataWithLocalRead_runner(StorageVariant.FILE)
-
-    @Test
-    fun testInterleavedUpdateDataWithLocalRead_okio() =
-        testInterleavedUpdateDataWithLocalRead_runner(StorageVariant.OKIO)
-
-    private fun testInterleavedUpdateDataWithLocalRead_runner(variant: StorageVariant) =
-        runTest(UnconfinedTestDispatcher(), timeout = 10000.milliseconds) {
-            val testData: Bundle = createDataStoreBundle(testFile.absolutePath, variant)
-            val dataStore: DataStore<FooProto> =
-                createDataStore(testData, dataStoreScope, context = dataStoreContext)
-            val serviceClasses = mapOf(
-                StorageVariant.FILE to InterleavedUpdateDataWithReadFileService::class,
-                StorageVariant.OKIO to InterleavedUpdateDataWithReadOkioService::class
-            )
-            val connection: BlockingServiceConnection =
-                setUpService(
-                    mainContext,
-                    serviceClasses[variant]!!.java,
-                    testData
-                )
-
-            // Invalidate any local cache
-            assertThat(dataStore.data.first()).isEqualTo(DEFAULT_FOO)
-            signalService(connection)
-
-            // Queue and start local write
-            val writeStarted = CompletableDeferred<Unit>()
-            val finishWrite = CompletableDeferred<Unit>()
-
-            val write = async {
-                dataStore.updateData {
-                    writeStarted.complete(Unit)
-                    finishWrite.await()
-                    FOO_WITH_TEXT
-                }
-            }
-            writeStarted.await()
-
-            // Queue remote write
-            signalService(connection)
-
-            // Local uncached read; this should see data initially written remotely.
-            assertThat(dataStore.data.first()).isEqualTo(
-                FooProto.newBuilder().setInteger(1).build()
-            )
-
-            // Unblock writes; the local write is delayed to ensure the remote write remains blocked.
-            val remoteWrite = async(newSingleThreadContext("blockedWriter")) {
-                signalService(connection)
-            }
-
-            val localWrite = async(newSingleThreadContext("unblockLocalWrite")) {
-                delay(500)
-                finishWrite.complete(Unit)
-                write.await()
-            }
-
-            localWrite.await()
-            remoteWrite.await()
-
-            assertThat(dataStore.data.first()).isEqualTo(FOO_WITH_TEXT_AND_BOOLEAN)
-        }
-
-    open class InterleavedUpdateDataWithReadFileService(
-        private val scope: TestScope = TestScope(UnconfinedTestDispatcher() + Job())
-    ) : DirectTestService() {
-        override fun beforeTest(testData: Bundle) {
-            store = createDataStore(testData, scope)
-        }
-
-        override fun runTest() = runBlocking<Unit> {
-            store.updateData {
-                INCREMENT_INTEGER(it)
-            }
-
-            waitForSignal()
-
-            val write = async {
-                store.updateData {
-                    WRITE_BOOLEAN(it)
-                }
-            }
-            waitForSignal()
-            write.await()
-        }
-    }
-
-    class InterleavedUpdateDataWithReadOkioService : InterleavedUpdateDataWithReadFileService()
-
-    @Test
     fun testUpdateDataExceptionUnblocksOtherProcessFromWriting_file() =
         testUpdateDataExceptionUnblocksOtherProcessFromWriting_runner(StorageVariant.FILE)
 
diff --git a/datastore/datastore-core/src/androidTest/java/androidx/datastore/core/multiprocess/MultiProcessDataStoreIpcTest.kt b/datastore/datastore-core/src/androidTest/java/androidx/datastore/core/multiprocess/MultiProcessDataStoreIpcTest.kt
index e02fc2d..677bacb 100644
--- a/datastore/datastore-core/src/androidTest/java/androidx/datastore/core/multiprocess/MultiProcessDataStoreIpcTest.kt
+++ b/datastore/datastore-core/src/androidTest/java/androidx/datastore/core/multiprocess/MultiProcessDataStoreIpcTest.kt
@@ -20,14 +20,19 @@
 import androidx.datastore.core.multiprocess.ipcActions.SetTextAction
 import androidx.datastore.core.multiprocess.ipcActions.StorageVariant
 import androidx.datastore.core.multiprocess.ipcActions.createMultiProcessTestDatastore
+import androidx.datastore.core.multiprocess.ipcActions.datastore
 import androidx.datastore.core.twoWayIpc.InterProcessCompletable
+import androidx.datastore.core.twoWayIpc.IpcAction
 import androidx.datastore.core.twoWayIpc.IpcUnit
+import androidx.datastore.core.twoWayIpc.TwoWayIpcSubject
+import androidx.datastore.testing.TestMessageProto.FooProto
 import com.google.common.truth.Truth.assertThat
 import kotlinx.coroutines.CompletableDeferred
 import kotlinx.coroutines.async
 import kotlinx.coroutines.awaitAll
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.first
+import kotlinx.parcelize.Parcelize
 import org.junit.Rule
 import org.junit.Test
 import org.junit.rules.TemporaryFolder
@@ -191,4 +196,94 @@
                 assertThat(it.integer).isEqualTo(99)
             }
         }
+
+    @Test
+    fun testInterleavedUpdateDataWithLocalRead_file() =
+        testInterleavedUpdateDataWithLocalRead(StorageVariant.FILE)
+
+    @Test
+    fun testInterleavedUpdateDataWithLocalRead_okio() =
+        testInterleavedUpdateDataWithLocalRead(StorageVariant.OKIO)
+
+    @Parcelize
+    private data class InterleavedDoubleUpdateAction(
+        val updatedInteger: InterProcessCompletable<IpcUnit> = InterProcessCompletable(),
+        val unblockBooleanWrite: InterProcessCompletable<IpcUnit> = InterProcessCompletable(),
+        val willWriteBooleanData: InterProcessCompletable<IpcUnit> = InterProcessCompletable(),
+    ) : IpcAction<IpcUnit>() {
+        override suspend fun invokeInRemoteProcess(
+            subject: TwoWayIpcSubject
+        ): IpcUnit {
+            subject.datastore.updateData {
+                it.toBuilder().setInteger(
+                    it.integer + 1
+                ).build()
+            }
+            updatedInteger.complete(subject, IpcUnit)
+            unblockBooleanWrite.await(subject)
+            willWriteBooleanData.complete(subject, IpcUnit)
+            subject.datastore.updateData {
+                it.toBuilder().setBoolean(true).build()
+            }
+            return IpcUnit
+        }
+    }
+    private fun testInterleavedUpdateDataWithLocalRead(storageVariant: StorageVariant) =
+        multiProcessRule.runTest {
+            val subject = multiProcessRule.createConnection().createSubject(
+                multiProcessRule.datastoreScope
+            )
+            val file = tmpFolder.newFile()
+            val dataStore = createMultiProcessTestDatastore(
+                filePath = file.canonicalPath,
+                storageVariant = storageVariant,
+                hostDatastoreScope = multiProcessRule.datastoreScope,
+                subjects = arrayOf(subject)
+            )
+            // invalidate local cache
+            assertThat(dataStore.data.first()).isEqualTo(FooProto.getDefaultInstance())
+            val remoteAction = InterleavedDoubleUpdateAction()
+            val remoteActionExecution = async {
+                subject.invokeInRemoteProcess(remoteAction)
+            }
+            // Queue and start local write
+            val writeStarted = CompletableDeferred<Unit>()
+            val finishWrite = CompletableDeferred<Unit>()
+
+            // wait for remote to write the int value
+            remoteAction.updatedInteger.await(subject)
+
+            val hostWrite = async {
+                dataStore.updateData {
+                    writeStarted.complete(Unit)
+                    finishWrite.await()
+                    FooProto.newBuilder().setText("hostValue").build()
+                }
+            }
+            writeStarted.await()
+            // our write is blocked so we should only see the int value for now
+            assertThat(dataStore.data.first()).isEqualTo(
+                FooProto.newBuilder().setInteger(1).build()
+            )
+            // unblock the remote write but it will be blocked as we already have a write
+            // lock in host process
+            remoteAction.unblockBooleanWrite.complete(subject, IpcUnit)
+            // wait for remote to be ready to write
+            remoteAction.willWriteBooleanData.await(subject)
+            // delay some to ensure remote is really blocked
+            delay(200)
+            finishWrite.complete(Unit)
+            // wait for both
+            listOf(hostWrite, remoteActionExecution).awaitAll()
+            // both writes committed
+            assertThat(
+                dataStore.data.first()
+            ).isEqualTo(
+                FooProto.getDefaultInstance().toBuilder()
+                    .setText("hostValue")
+                    // int is not set since local did override it w/ default
+                    .setBoolean(true)
+                    .build()
+            )
+        }
 }