Statsd sends active config broadcasts

Statsd now sends active configs changed broadcasts when needed per uid.
Also made an adb command to help debug.

More gts tests and unit tests required, will follow.
Test: GTS in topic
Bug: 123372077
Change-Id: Ib079018ded85d002581ffc2ba1240138ce7a54e7
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index dd18bd4..653ef2e 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -88,12 +88,15 @@
                                      const sp<AlarmMonitor>& anomalyAlarmMonitor,
                                      const sp<AlarmMonitor>& periodicAlarmMonitor,
                                      const int64_t timeBaseNs,
-                                     const std::function<bool(const ConfigKey&)>& sendBroadcast)
+                                     const std::function<bool(const ConfigKey&)>& sendBroadcast,
+                                     const std::function<bool(
+                                            const int&, const vector<int64_t>&)>& activateBroadcast)
     : mUidMap(uidMap),
       mPullerManager(pullerManager),
       mAnomalyAlarmMonitor(anomalyAlarmMonitor),
       mPeriodicAlarmMonitor(periodicAlarmMonitor),
       mSendBroadcast(sendBroadcast),
+      mSendActivationBroadcast(activateBroadcast),
       mTimeBaseNs(timeBaseNs),
       mLargestTimestampSeen(0),
       mLastTimestampSeen(0) {
@@ -223,11 +226,73 @@
         mapIsolatedUidToHostUidIfNecessaryLocked(event);
     }
 
+    std::unordered_set<int> uidsWithActiveConfigsChanged;
+    std::unordered_map<int, std::vector<int64_t>> activeConfigsPerUid;
     // pass the event to metrics managers.
     for (auto& pair : mMetricsManagers) {
+        int uid = pair.first.GetUid();
+        int64_t configId = pair.first.GetId();
+        bool isPrevActive = pair.second->isActive();
         pair.second->onLogEvent(*event);
+        bool isCurActive = pair.second->isActive();
+        // Map all active configs by uid.
+        if (isCurActive) {
+            auto activeConfigs = activeConfigsPerUid.find(uid);
+            if (activeConfigs != activeConfigsPerUid.end()) {
+                activeConfigs->second.push_back(configId);
+            } else {
+                vector<int64_t> newActiveConfigs;
+                newActiveConfigs.push_back(configId);
+                activeConfigsPerUid[uid] = newActiveConfigs;
+            }
+        }
+        // The activation state of this config changed.
+        if (isPrevActive != isCurActive) {
+            VLOG("Active status changed for uid  %d", uid);
+            uidsWithActiveConfigsChanged.insert(uid);
+            StatsdStats::getInstance().noteActiveStatusChanged(pair.first, isCurActive);
+        }
         flushIfNecessaryLocked(event->GetElapsedTimestampNs(), pair.first, *(pair.second));
     }
+
+    for (int uid : uidsWithActiveConfigsChanged) {
+        // Send broadcast so that receivers can pull data.
+        auto lastBroadcastTime = mLastActivationBroadcastTimes.find(uid);
+        if (lastBroadcastTime != mLastActivationBroadcastTimes.end()) {
+            if (currentTimestampNs - lastBroadcastTime->second <
+                    StatsdStats::kMinActivationBroadcastPeriodNs) {
+                VLOG("StatsD would've sent an activation broadcast but the rate limit stopped us.");
+                return;
+            }
+        }
+        auto activeConfigs = activeConfigsPerUid.find(uid);
+        if (activeConfigs != activeConfigsPerUid.end()) {
+            if (mSendActivationBroadcast(uid, activeConfigs->second)) {
+                VLOG("StatsD sent activation notice for uid %d", uid);
+                mLastActivationBroadcastTimes[uid] = currentTimestampNs;
+            }
+        } else {
+            std::vector<int64_t> emptyActiveConfigs;
+            if (mSendActivationBroadcast(uid, emptyActiveConfigs)) {
+                VLOG("StatsD sent EMPTY activation notice for uid %d", uid);
+                mLastActivationBroadcastTimes[uid] = currentTimestampNs;
+            }
+        }
+    }
+}
+
+void StatsLogProcessor::GetActiveConfigs(const int uid, vector<int64_t>& outActiveConfigs) {
+    std::lock_guard<std::mutex> lock(mMetricsMutex);
+    GetActiveConfigsLocked(uid, outActiveConfigs);
+}
+
+void StatsLogProcessor::GetActiveConfigsLocked(const int uid, vector<int64_t>& outActiveConfigs) {
+    outActiveConfigs.clear();
+    for (auto& pair : mMetricsManagers) {
+        if (pair.first.GetUid() == uid && pair.second->isActive()) {
+            outActiveConfigs.push_back(pair.first.GetId());
+        }
+    }
 }
 
 void StatsLogProcessor::OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
@@ -444,6 +509,18 @@
 
     mLastBroadcastTimes.erase(key);
 
+    int uid = key.GetUid();
+    bool lastConfigForUid = true;
+    for (auto it : mMetricsManagers) {
+        if (it.first.GetUid() == uid) {
+            lastConfigForUid = false;
+            break;
+        }
+    }
+    if (lastConfigForUid) {
+        mLastActivationBroadcastTimes.erase(uid);
+    }
+
     if (mMetricsManagers.empty()) {
         mPullerManager->ForceClearPullerCache();
     }