1 // Copyright (c) 2011-2012 Ryan Prichard
3 // Permission is hereby granted, free of charge, to any person obtaining a copy
4 // of this software and associated documentation files (the "Software"), to
5 // deal in the Software without restriction, including without limitation the
6 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 // sell copies of the Software, and to permit persons to whom the Software is
8 // furnished to do so, subject to the following conditions:
10 // The above copyright notice and this permission notice shall be included in
11 // all copies or substantial portions of the Software.
13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
25 #include "EventLoop.h"
26 #include "NamedPipe.h"
27 #include "../shared/DebugClient.h"
28 #include "../shared/StringUtil.h"
29 #include "../shared/WindowsSecurity.h"
30 #include "../shared/WinptyAssert.h"
32 // Returns true if anything happens (data received, data sent, pipe error).
33 bool NamedPipe::serviceIo(std::vector<HANDLE> *waitHandles)
35 bool justConnected = false;
36 const auto kError = ServiceResult::Error;
37 const auto kProgress = ServiceResult::Progress;
38 const auto kNoProgress = ServiceResult::NoProgress;
39 if (m_handle == NULL) {
42 if (m_connectEvent.get() != nullptr) {
43 // We're still connecting this server pipe. Check whether the pipe is
44 // now connected. If it isn't, add the pipe to the list of handles to
48 GetOverlappedResult(m_handle, &m_connectOver, &actual, FALSE);
49 if (!success && GetLastError() == ERROR_PIPE_CONNECTED) {
50 // I'm not sure this can happen, but it's easy to handle if it
55 ASSERT(GetLastError() == ERROR_IO_INCOMPLETE &&
56 "Pended ConnectNamedPipe call failed");
57 waitHandles->push_back(m_connectEvent.get());
59 TRACE("Server pipe [%s] connected",
60 utf8FromWide(m_name).c_str());
61 m_connectEvent.dispose();
66 const auto readProgress = m_inputWorker ? m_inputWorker->service() : kNoProgress;
67 const auto writeProgress = m_outputWorker ? m_outputWorker->service() : kNoProgress;
68 if (readProgress == kError || writeProgress == kError) {
72 if (m_inputWorker && m_inputWorker->getWaitEvent() != nullptr) {
73 waitHandles->push_back(m_inputWorker->getWaitEvent());
75 if (m_outputWorker && m_outputWorker->getWaitEvent() != nullptr) {
76 waitHandles->push_back(m_outputWorker->getWaitEvent());
79 || readProgress == kProgress
80 || writeProgress == kProgress;
83 // manual reset, initially unset
84 static OwnedHandle createEvent() {
85 HANDLE ret = CreateEventW(nullptr, TRUE, FALSE, nullptr);
86 ASSERT(ret != nullptr && "CreateEventW failed");
87 return OwnedHandle(ret);
90 NamedPipe::IoWorker::IoWorker(NamedPipe &namedPipe) :
91 m_namedPipe(namedPipe),
92 m_event(createEvent())
96 NamedPipe::ServiceResult NamedPipe::IoWorker::service()
98 ServiceResult progress = ServiceResult::NoProgress;
101 BOOL ret = GetOverlappedResult(m_namedPipe.m_handle, &m_over, &actual, FALSE);
103 if (GetLastError() == ERROR_IO_INCOMPLETE) {
104 // There is a pending I/O.
108 return ServiceResult::Error;
111 ResetEvent(m_event.get());
115 progress = ServiceResult::Progress;
119 while (shouldIssueIo(&nextSize, &isRead)) {
120 m_currentIoSize = nextSize;
122 memset(&m_over, 0, sizeof(m_over));
123 m_over.hEvent = m_event.get();
125 ? ReadFile(m_namedPipe.m_handle, m_buffer, nextSize, &actual, &m_over)
126 : WriteFile(m_namedPipe.m_handle, m_buffer, nextSize, &actual, &m_over);
128 if (GetLastError() == ERROR_IO_PENDING) {
129 // There is a pending I/O.
134 return ServiceResult::Error;
137 ResetEvent(m_event.get());
140 progress = ServiceResult::Progress;
145 // This function is called after CancelIo has returned. We need to block until
146 // the I/O operations have completed, which should happen very quickly.
147 // https://blogs.msdn.microsoft.com/oldnewthing/20110202-00/?p=11613
148 void NamedPipe::IoWorker::waitForCanceledIo()
152 GetOverlappedResult(m_namedPipe.m_handle, &m_over, &actual, TRUE);
157 HANDLE NamedPipe::IoWorker::getWaitEvent()
159 return m_pending ? m_event.get() : NULL;
162 void NamedPipe::InputWorker::completeIo(DWORD size)
164 m_namedPipe.m_inQueue.append(m_buffer, size);
167 bool NamedPipe::InputWorker::shouldIssueIo(DWORD *size, bool *isRead)
170 ASSERT(!m_namedPipe.isConnecting());
171 if (m_namedPipe.isClosed()) {
173 } else if (m_namedPipe.m_inQueue.size() < m_namedPipe.readBufferSize()) {
181 void NamedPipe::OutputWorker::completeIo(DWORD size)
183 ASSERT(size == m_currentIoSize);
186 bool NamedPipe::OutputWorker::shouldIssueIo(DWORD *size, bool *isRead)
189 if (!m_namedPipe.m_outQueue.empty()) {
190 auto &out = m_namedPipe.m_outQueue;
191 const DWORD writeSize = std::min<size_t>(out.size(), kIoSize);
192 std::copy(&out[0], &out[writeSize], m_buffer);
193 out.erase(0, writeSize);
201 DWORD NamedPipe::OutputWorker::getPendingIoSize()
203 return m_pending ? m_currentIoSize : 0;
206 void NamedPipe::openServerPipe(LPCWSTR pipeName, OpenMode::t openMode,
207 int outBufferSize, int inBufferSize) {
209 ASSERT((openMode & OpenMode::Duplex) != 0);
210 const DWORD winOpenMode =
211 ((openMode & OpenMode::Reading) ? PIPE_ACCESS_INBOUND : 0)
212 | ((openMode & OpenMode::Writing) ? PIPE_ACCESS_OUTBOUND : 0)
213 | FILE_FLAG_FIRST_PIPE_INSTANCE
214 | FILE_FLAG_OVERLAPPED;
215 const auto sd = createPipeSecurityDescriptorOwnerFullControl();
216 ASSERT(sd && "error creating data pipe SECURITY_DESCRIPTOR");
217 SECURITY_ATTRIBUTES sa = {};
218 sa.nLength = sizeof(sa);
219 sa.lpSecurityDescriptor = sd.get();
220 HANDLE handle = CreateNamedPipeW(
222 /*dwOpenMode=*/winOpenMode,
223 /*dwPipeMode=*/rejectRemoteClientsPipeFlag(),
225 /*nOutBufferSize=*/outBufferSize,
226 /*nInBufferSize=*/inBufferSize,
227 /*nDefaultTimeOut=*/30000,
229 TRACE("opened server pipe [%s], handle == %p",
230 utf8FromWide(pipeName).c_str(), handle);
231 ASSERT(handle != INVALID_HANDLE_VALUE && "Could not open server pipe");
234 m_openMode = openMode;
236 // Start an asynchronous connection attempt.
237 m_connectEvent = createEvent();
238 memset(&m_connectOver, 0, sizeof(m_connectOver));
239 m_connectOver.hEvent = m_connectEvent.get();
240 BOOL success = ConnectNamedPipe(m_handle, &m_connectOver);
241 const auto err = GetLastError();
242 if (!success && err == ERROR_PIPE_CONNECTED) {
246 TRACE("Server pipe [%s] connected", utf8FromWide(pipeName).c_str());
247 m_connectEvent.dispose();
249 } else if (err != ERROR_IO_PENDING) {
250 ASSERT(false && "ConnectNamedPipe call failed");
254 void NamedPipe::connectToServer(LPCWSTR pipeName, OpenMode::t openMode)
257 ASSERT((openMode & OpenMode::Duplex) != 0);
258 HANDLE handle = CreateFileW(
260 GENERIC_READ | GENERIC_WRITE,
264 SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION | FILE_FLAG_OVERLAPPED,
266 TRACE("connected to [%s], handle == %p",
267 utf8FromWide(pipeName).c_str(), handle);
268 ASSERT(handle != INVALID_HANDLE_VALUE && "Could not connect to pipe");
271 m_openMode = openMode;
275 void NamedPipe::startPipeWorkers()
277 if (m_openMode & OpenMode::Reading) {
278 m_inputWorker.reset(new InputWorker(*this));
280 if (m_openMode & OpenMode::Writing) {
281 m_outputWorker.reset(new OutputWorker(*this));
285 size_t NamedPipe::bytesToSend()
287 ASSERT(m_openMode & OpenMode::Writing);
288 auto ret = m_outQueue.size();
289 if (m_outputWorker != NULL) {
290 ret += m_outputWorker->getPendingIoSize();
295 void NamedPipe::write(const void *data, size_t size)
297 ASSERT(m_openMode & OpenMode::Writing);
298 m_outQueue.append(reinterpret_cast<const char*>(data), size);
301 void NamedPipe::write(const char *text)
303 write(text, strlen(text));
306 size_t NamedPipe::readBufferSize()
308 ASSERT(m_openMode & OpenMode::Reading);
309 return m_readBufferSize;
312 void NamedPipe::setReadBufferSize(size_t size)
314 ASSERT(m_openMode & OpenMode::Reading);
315 m_readBufferSize = size;
318 size_t NamedPipe::bytesAvailable()
320 ASSERT(m_openMode & OpenMode::Reading);
321 return m_inQueue.size();
324 size_t NamedPipe::peek(void *data, size_t size)
326 ASSERT(m_openMode & OpenMode::Reading);
327 const auto out = reinterpret_cast<char*>(data);
328 const size_t ret = std::min(size, m_inQueue.size());
329 std::copy(&m_inQueue[0], &m_inQueue[ret], out);
333 size_t NamedPipe::read(void *data, size_t size)
335 size_t ret = peek(data, size);
336 m_inQueue.erase(0, ret);
340 std::string NamedPipe::readToString(size_t size)
342 ASSERT(m_openMode & OpenMode::Reading);
343 size_t retSize = std::min(size, m_inQueue.size());
344 std::string ret = m_inQueue.substr(0, retSize);
345 m_inQueue.erase(0, retSize);
349 std::string NamedPipe::readAllToString()
351 ASSERT(m_openMode & OpenMode::Reading);
352 std::string ret = m_inQueue;
357 void NamedPipe::closePipe()
359 if (m_handle == NULL) {
363 if (m_connectEvent.get() != nullptr) {
365 GetOverlappedResult(m_handle, &m_connectOver, &actual, TRUE);
366 m_connectEvent.dispose();
369 m_inputWorker->waitForCanceledIo();
370 m_inputWorker.reset();
372 if (m_outputWorker) {
373 m_outputWorker->waitForCanceledIo();
374 m_outputWorker.reset();
376 CloseHandle(m_handle);