libbinder: reverse connections
When connecting to an RPC client server, you can request to serve a
threadpool so that you can receive callbacks from it.
Future considerations:
- starting threads dynamically (likely very, very soon after this CL)
- combining threadpools (as needed)
Bug: 185167543
Test: binderRpcTest
Change-Id: I992959e963ebc1b3da2f89fdb6c1ec625cb51af4
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index efc70e6..80708df 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -14,6 +14,7 @@
* limitations under the License.
*/
+#include <BnBinderRpcCallback.h>
#include <BnBinderRpcSession.h>
#include <BnBinderRpcTest.h>
#include <aidl/IBinderRpcTest.h>
@@ -34,6 +35,7 @@
#include <cstdlib>
#include <iostream>
#include <thread>
+#include <type_traits>
#include <sys/prctl.h>
#include <unistd.h>
@@ -89,6 +91,22 @@
};
std::atomic<int32_t> MyBinderRpcSession::gNum;
+class MyBinderRpcCallback : public BnBinderRpcCallback {
+ Status sendCallback(const std::string& value) {
+ std::unique_lock _l(mMutex);
+ mValues.push_back(value);
+ _l.unlock();
+ mCv.notify_one();
+ return Status::ok();
+ }
+ Status sendOnewayCallback(const std::string& value) { return sendCallback(value); }
+
+public:
+ std::mutex mMutex;
+ std::condition_variable mCv;
+ std::vector<std::string> mValues;
+};
+
class MyBinderRpcTest : public BnBinderRpcTest {
public:
wp<RpcServer> server;
@@ -187,6 +205,27 @@
return sleepMs(ms);
}
+ Status doCallback(const sp<IBinderRpcCallback>& callback, bool oneway, bool delayed,
+ const std::string& value) override {
+ if (callback == nullptr) {
+ return Status::fromExceptionCode(Status::EX_NULL_POINTER);
+ }
+
+ if (delayed) {
+ std::thread([=]() {
+ ALOGE("Executing delayed callback: '%s'", value.c_str());
+ (void)doCallback(callback, oneway, false, value);
+ }).detach();
+ return Status::ok();
+ }
+
+ if (oneway) {
+ return callback->sendOnewayCallback(value);
+ }
+
+ return callback->sendCallback(value);
+ }
+
Status die(bool cleanup) override {
if (cleanup) {
exit(1);
@@ -308,6 +347,9 @@
BinderRpcTestProcessSession(BinderRpcTestProcessSession&&) = default;
~BinderRpcTestProcessSession() {
+ EXPECT_NE(nullptr, rootIface);
+ if (rootIface == nullptr) return;
+
if (!expectAlreadyShutdown) {
std::vector<int32_t> remoteCounts;
// calling over any sessions counts across all sessions
@@ -348,7 +390,7 @@
// This creates a new process serving an interface on a certain number of
// threads.
ProcessSession createRpcTestSocketServerProcess(
- size_t numThreads, size_t numSessions,
+ size_t numThreads, size_t numSessions, size_t numReverseConnections,
const std::function<void(const sp<RpcServer>&)>& configure) {
CHECK_GE(numSessions, 1) << "Must have at least one session to a server";
@@ -404,6 +446,8 @@
for (size_t i = 0; i < numSessions; i++) {
sp<RpcSession> session = RpcSession::make();
+ session->setMaxReverseConnections(numReverseConnections);
+
switch (socketType) {
case SocketType::UNIX:
if (session->setupUnixDomainClient(addr.c_str())) goto success;
@@ -425,9 +469,11 @@
}
BinderRpcTestProcessSession createRpcTestSocketServerProcess(size_t numThreads,
- size_t numSessions = 1) {
+ size_t numSessions = 1,
+ size_t numReverseConnections = 0) {
BinderRpcTestProcessSession ret{
.proc = createRpcTestSocketServerProcess(numThreads, numSessions,
+ numReverseConnections,
[&](const sp<RpcServer>& server) {
sp<MyBinderRpcTest> service =
new MyBinderRpcTest;
@@ -895,6 +941,38 @@
for (auto& t : threads) t.join();
}
+TEST_P(BinderRpc, Callbacks) {
+ const static std::string kTestString = "good afternoon!";
+
+ for (bool oneway : {true, false}) {
+ for (bool delayed : {true, false}) {
+ auto proc = createRpcTestSocketServerProcess(1, 1, 1);
+ auto cb = sp<MyBinderRpcCallback>::make();
+
+ EXPECT_OK(proc.rootIface->doCallback(cb, oneway, delayed, kTestString));
+
+ using std::literals::chrono_literals::operator""s;
+ std::unique_lock<std::mutex> _l(cb->mMutex);
+ cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
+
+ EXPECT_EQ(cb->mValues.size(), 1) << "oneway: " << oneway << "delayed: " << delayed;
+ if (cb->mValues.empty()) continue;
+ EXPECT_EQ(cb->mValues.at(0), kTestString)
+ << "oneway: " << oneway << "delayed: " << delayed;
+
+ // since we are severing the connection, we need to go ahead and
+ // tell the server to shutdown and exit so that waitpid won't hang
+ EXPECT_OK(proc.rootIface->scheduleShutdown());
+
+ // since this session has a reverse connection w/ a threadpool, we
+ // need to manually shut it down
+ EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown());
+
+ proc.expectAlreadyShutdown = true;
+ }
+ }
+}
+
TEST_P(BinderRpc, Die) {
for (bool doDeathCleanup : {true, false}) {
auto proc = createRpcTestSocketServerProcess(1);