blob: b0f1460a16c57c3da5779ae5ecab990310917b60 [file] [log] [blame]
Eric Holka1cc5402020-12-04 23:37:11 +00001/*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef ART_RUNTIME_BASE_MESSAGE_QUEUE_H_
18#define ART_RUNTIME_BASE_MESSAGE_QUEUE_H_
19
20#include <deque>
21#include <optional>
22#include <variant>
23
24#include "base/time_utils.h"
25#include "mutex.h"
26#include "thread.h"
27
28#pragma clang diagnostic push
29#pragma clang diagnostic error "-Wconversion"
30
31namespace art {
32
33struct TimeoutExpiredMessage {};
34
35// MessageQueue is an unbounded multiple producer, multiple consumer (MPMC) queue that can be
36// specialized to send messages between threads. The queue is parameterized by a set of types that
37// serve as the message types. Note that messages are passed by value, so smaller messages should be
38// used when possible.
39//
40// Example:
41//
42// struct IntMessage { int value; };
43// struct DoubleMessage { double value; };
44//
45// MessageQueue<IntMessage, DoubleMessage> queue;
46//
47// queue.SendMessage(IntMessage{42});
48// queue.SendMessage(DoubleMessage{42.0});
49//
50// auto message = queue.ReceiveMessage(); // message is a std::variant of the different
51// // message types.
52//
53// if (std::holds_alternative<IntMessage>(message)) {
54// cout << "Received int message with value " << std::get<IntMessage>(message) << "\n";
55// }
56//
57// The message queue also supports a special timeout message. This is scheduled to be sent by the
58// SetTimeout method, which will cause the MessageQueue to deliver a TimeoutExpiredMessage after the
59// time period has elapsed. Note that only one timeout can be active can be active at a time, and
60// subsequent calls to SetTimeout will overwrite any existing timeout.
61//
62// Example:
63//
64// queue.SetTimeout(5000); // request to send TimeoutExpiredMessage in 5000ms.
65//
66// auto message = queue.ReceiveMessage(); // blocks for 5000ms and returns
67// // TimeoutExpiredMessage
68//
69// Note additional messages can be sent in the meantime and a ReceiveMessage call will wake up to
70// return that message. The TimeoutExpiredMessage will still be sent at the right time.
71//
72// Finally, MessageQueue has a SwitchReceive method that can be used to run different code depending
73// on the type of message received. SwitchReceive takes a set of lambda expressions that take one
74// argument of one of the allowed message types. An additional lambda expression that takes a single
75// auto argument can be used to serve as a catch-all case.
76//
77// Example:
78//
79// queue.SwitchReceive(
80// [&](IntMessage message) {
81// cout << "Received int: " << message.value << "\n";
82// },
83// [&](DoubleMessage message) {
84// cout << "Received double: " << message.value << "\n";
85// },
86// [&](auto other_message) {
87// // Another message was received. In this case, it's TimeoutExpiredMessage.
88// }
89// )
90//
91// For additional examples, see message_queue_test.cc.
92template <typename... MessageTypes>
93class MessageQueue {
94 public:
95 using Message = std::variant<TimeoutExpiredMessage, MessageTypes...>;
96
97 // Adds a message to the message queue, which can later be received with ReceiveMessage. See class
98 // comment for more details.
99 void SendMessage(Message message) {
100 // TimeoutExpiredMessage should not be sent manually.
101 DCHECK(!std::holds_alternative<TimeoutExpiredMessage>(message));
102 Thread* self = Thread::Current();
103 MutexLock lock{self, mutex_};
104 messages_.push_back(message);
105 cv_.Signal(self);
106 }
107
108 // Schedule a TimeoutExpiredMessage to be delivered in timeout_milliseconds. See class comment for
109 // more details.
110 void SetTimeout(uint64_t timeout_milliseconds) {
111 Thread* self = Thread::Current();
112 MutexLock lock{self, mutex_};
113 deadline_milliseconds_ = timeout_milliseconds + MilliTime();
114 cv_.Signal(self);
115 }
116
117 // Remove and return a message from the queue. If no message is available, ReceiveMessage will
118 // block until one becomes available. See class comment for more details.
119 Message ReceiveMessage() {
120 Thread* self = Thread::Current();
121 MutexLock lock{self, mutex_};
122
123 // Loop until we receive a message
124 while (true) {
125 uint64_t const current_time = MilliTime();
126 // First check if the deadline has passed.
127 if (deadline_milliseconds_.has_value() && deadline_milliseconds_.value() < current_time) {
128 deadline_milliseconds_.reset();
129 return TimeoutExpiredMessage{};
130 }
131
132 // Check if there is a message in the queue.
133 if (messages_.size() > 0) {
134 Message message = messages_.front();
135 messages_.pop_front();
136 return message;
137 }
138
139 // Otherwise, wait until we have a message or a timeout.
140 if (deadline_milliseconds_.has_value()) {
141 DCHECK_LE(current_time, deadline_milliseconds_.value());
142 int64_t timeout = static_cast<int64_t>(deadline_milliseconds_.value() - current_time);
143 cv_.TimedWait(self, timeout, /*ns=*/0);
144 } else {
145 cv_.Wait(self);
146 }
147 }
148 }
149
150 // Waits for a message and applies the appropriate function argument to the received message. See
151 // class comment for more details.
152 template <typename ReturnType = void, typename... Fn>
153 ReturnType SwitchReceive(Fn... case_fn) {
154 struct Matcher : Fn... {
155 using Fn::operator()...;
156 } matcher{case_fn...};
157 return std::visit(matcher, ReceiveMessage());
158 }
159
160 private:
161 Mutex mutex_{"MessageQueue Mutex"};
162 ConditionVariable cv_{"MessageQueue ConditionVariable", mutex_};
163
164 std::deque<Message> messages_ GUARDED_BY(mutex_);
165 std::optional<uint64_t> deadline_milliseconds_ GUARDED_BY(mutex_);
166};
167
168} // namespace art
169
170#pragma clang diagnostic pop // -Wconversion
171
172#endif // ART_RUNTIME_BASE_MESSAGE_QUEUE_H_