| /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to |
| * deal in the Software without restriction, including without limitation the |
| * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
| * sell copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
| * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| * IN THE SOFTWARE. |
| */ |
| |
| #include <assert.h> |
| #include <io.h> |
| #include <string.h> |
| #include <stdio.h> |
| |
| #include "uv.h" |
| #include "internal.h" |
| #include "handle-inl.h" |
| #include "stream-inl.h" |
| #include "req-inl.h" |
| |
| |
| /* A zero-size buffer for use by uv_pipe_read */ |
| static char uv_zero_[] = ""; |
| |
| /* Null uv_buf_t */ |
| static const uv_buf_t uv_null_buf_ = { 0, NULL }; |
| |
| /* The timeout that the pipe will wait for the remote end to write data */ |
| /* when the local ends wants to shut it down. */ |
| static const int64_t eof_timeout = 50; /* ms */ |
| |
| static const int default_pending_pipe_instances = 4; |
| |
| /* IPC protocol flags. */ |
| #define UV_IPC_RAW_DATA 0x0001 |
| #define UV_IPC_TCP_SERVER 0x0002 |
| #define UV_IPC_TCP_CONNECTION 0x0004 |
| |
| /* IPC frame header. */ |
| typedef struct { |
| int flags; |
| uint64_t raw_data_length; |
| } uv_ipc_frame_header_t; |
| |
| /* IPC frame, which contains an imported TCP socket stream. */ |
| typedef struct { |
| uv_ipc_frame_header_t header; |
| WSAPROTOCOL_INFOW socket_info; |
| } uv_ipc_frame_uv_stream; |
| |
| static void eof_timer_init(uv_pipe_t* pipe); |
| static void eof_timer_start(uv_pipe_t* pipe); |
| static void eof_timer_stop(uv_pipe_t* pipe); |
| static void eof_timer_cb(uv_timer_t* timer, int status); |
| static void eof_timer_destroy(uv_pipe_t* pipe); |
| static void eof_timer_close_cb(uv_handle_t* handle); |
| |
| |
| static void uv_unique_pipe_name(char* ptr, char* name, size_t size) { |
| _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%d", ptr, GetCurrentProcessId()); |
| } |
| |
| |
| int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { |
| uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); |
| |
| handle->reqs_pending = 0; |
| handle->handle = INVALID_HANDLE_VALUE; |
| handle->name = NULL; |
| handle->ipc_pid = 0; |
| handle->remaining_ipc_rawdata_bytes = 0; |
| handle->pending_ipc_info.socket_info = NULL; |
| handle->pending_ipc_info.tcp_connection = 0; |
| handle->ipc = ipc; |
| handle->non_overlapped_writes_tail = NULL; |
| |
| uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); |
| |
| return 0; |
| } |
| |
| |
| static void uv_pipe_connection_init(uv_pipe_t* handle) { |
| uv_connection_init((uv_stream_t*) handle); |
| handle->read_req.data = handle; |
| handle->eof_timer = NULL; |
| } |
| |
| |
| static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) { |
| HANDLE pipeHandle; |
| |
| /* |
| * Assume that we have a duplex pipe first, so attempt to |
| * connect with GENERIC_READ | GENERIC_WRITE. |
| */ |
| pipeHandle = CreateFileW(name, |
| GENERIC_READ | GENERIC_WRITE, |
| 0, |
| NULL, |
| OPEN_EXISTING, |
| FILE_FLAG_OVERLAPPED, |
| NULL); |
| if (pipeHandle != INVALID_HANDLE_VALUE) { |
| *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; |
| return pipeHandle; |
| } |
| |
| /* |
| * If the pipe is not duplex CreateFileW fails with |
| * ERROR_ACCESS_DENIED. In that case try to connect |
| * as a read-only or write-only. |
| */ |
| if (GetLastError() == ERROR_ACCESS_DENIED) { |
| pipeHandle = CreateFileW(name, |
| GENERIC_READ | FILE_WRITE_ATTRIBUTES, |
| 0, |
| NULL, |
| OPEN_EXISTING, |
| FILE_FLAG_OVERLAPPED, |
| NULL); |
| |
| if (pipeHandle != INVALID_HANDLE_VALUE) { |
| *duplex_flags = UV_HANDLE_READABLE; |
| return pipeHandle; |
| } |
| } |
| |
| if (GetLastError() == ERROR_ACCESS_DENIED) { |
| pipeHandle = CreateFileW(name, |
| GENERIC_WRITE | FILE_READ_ATTRIBUTES, |
| 0, |
| NULL, |
| OPEN_EXISTING, |
| FILE_FLAG_OVERLAPPED, |
| NULL); |
| |
| if (pipeHandle != INVALID_HANDLE_VALUE) { |
| *duplex_flags = UV_HANDLE_WRITABLE; |
| return pipeHandle; |
| } |
| } |
| |
| return INVALID_HANDLE_VALUE; |
| } |
| |
| |
| uv_err_t uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, |
| char* name, size_t nameSize) { |
| HANDLE pipeHandle; |
| int errorno; |
| uv_err_t err; |
| char* ptr = (char*)handle; |
| |
| for (;;) { |
| uv_unique_pipe_name(ptr, name, nameSize); |
| |
| pipeHandle = CreateNamedPipeA(name, |
| access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE, |
| PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0, |
| NULL); |
| |
| if (pipeHandle != INVALID_HANDLE_VALUE) { |
| /* No name collisions. We're done. */ |
| break; |
| } |
| |
| errorno = GetLastError(); |
| if (errorno != ERROR_PIPE_BUSY && errorno != ERROR_ACCESS_DENIED) { |
| err = uv__new_sys_error(errorno); |
| goto error; |
| } |
| |
| /* Pipe name collision. Increment the pointer and try again. */ |
| ptr++; |
| } |
| |
| if (CreateIoCompletionPort(pipeHandle, |
| loop->iocp, |
| (ULONG_PTR)handle, |
| 0) == NULL) { |
| err = uv__new_sys_error(GetLastError()); |
| goto error; |
| } |
| |
| uv_pipe_connection_init(handle); |
| handle->handle = pipeHandle; |
| |
| return uv_ok_; |
| |
| error: |
| if (pipeHandle != INVALID_HANDLE_VALUE) { |
| CloseHandle(pipeHandle); |
| } |
| |
| return err; |
| } |
| |
| |
| static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle, |
| HANDLE pipeHandle, DWORD duplex_flags) { |
| NTSTATUS nt_status; |
| IO_STATUS_BLOCK io_status; |
| FILE_MODE_INFORMATION mode_info; |
| DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; |
| |
| if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { |
| /* If this returns ERROR_INVALID_PARAMETER we probably opened something */ |
| /* that is not a pipe. */ |
| if (GetLastError() == ERROR_INVALID_PARAMETER) { |
| SetLastError(WSAENOTSOCK); |
| } |
| return -1; |
| } |
| |
| /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */ |
| nt_status = pNtQueryInformationFile(pipeHandle, |
| &io_status, |
| &mode_info, |
| sizeof(mode_info), |
| FileModeInformation); |
| if (nt_status != STATUS_SUCCESS) { |
| return -1; |
| } |
| |
| if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT || |
| mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) { |
| /* Non-overlapped pipe. */ |
| handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE; |
| } else { |
| /* Overlapped pipe. Try to associate with IOCP. */ |
| if (CreateIoCompletionPort(pipeHandle, |
| loop->iocp, |
| (ULONG_PTR)handle, |
| 0) == NULL) { |
| handle->flags |= UV_HANDLE_EMULATE_IOCP; |
| } |
| } |
| |
| handle->handle = pipeHandle; |
| handle->flags |= duplex_flags; |
| |
| return 0; |
| } |
| |
| |
| static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) { |
| uv_loop_t* loop; |
| uv_pipe_t* handle; |
| uv_shutdown_t* req; |
| |
| req = (uv_shutdown_t*) parameter; |
| assert(req); |
| handle = (uv_pipe_t*) req->handle; |
| assert(handle); |
| loop = handle->loop; |
| assert(loop); |
| |
| FlushFileBuffers(handle->handle); |
| |
| /* Post completed */ |
| POST_COMPLETION_FOR_REQ(loop, req); |
| |
| return 0; |
| } |
| |
| |
| void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { |
| DWORD result; |
| uv_shutdown_t* req; |
| NTSTATUS nt_status; |
| IO_STATUS_BLOCK io_status; |
| FILE_PIPE_LOCAL_INFORMATION pipe_info; |
| |
| if ((handle->flags & UV_HANDLE_CONNECTION) && |
| handle->shutdown_req != NULL && |
| handle->write_reqs_pending == 0) { |
| req = handle->shutdown_req; |
| |
| /* Clear the shutdown_req field so we don't go here again. */ |
| handle->shutdown_req = NULL; |
| |
| if (handle->flags & UV__HANDLE_CLOSING) { |
| UNREGISTER_HANDLE_REQ(loop, handle, req); |
| |
| /* Already closing. Cancel the shutdown. */ |
| if (req->cb) { |
| uv__set_artificial_error(loop, UV_ECANCELED); |
| req->cb(req, -1); |
| } |
| |
| DECREASE_PENDING_REQ_COUNT(handle); |
| return; |
| } |
| |
| /* Try to avoid flushing the pipe buffer in the thread pool. */ |
| nt_status = pNtQueryInformationFile(handle->handle, |
| &io_status, |
| &pipe_info, |
| sizeof pipe_info, |
| FilePipeLocalInformation); |
| |
| if (nt_status != STATUS_SUCCESS) { |
| /* Failure */ |
| UNREGISTER_HANDLE_REQ(loop, handle, req); |
| |
| handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */ |
| if (req->cb) { |
| uv__set_sys_error(loop, pRtlNtStatusToDosError(nt_status)); |
| req->cb(req, -1); |
| } |
| |
| DECREASE_PENDING_REQ_COUNT(handle); |
| return; |
| } |
| |
| if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) { |
| /* Short-circuit, no need to call FlushFileBuffers. */ |
| uv_insert_pending_req(loop, (uv_req_t*) req); |
| return; |
| } |
| |
| /* Run FlushFileBuffers in the thread pool. */ |
| result = QueueUserWorkItem(pipe_shutdown_thread_proc, |
| req, |
| WT_EXECUTELONGFUNCTION); |
| if (result) { |
| return; |
| |
| } else { |
| /* Failure. */ |
| UNREGISTER_HANDLE_REQ(loop, handle, req); |
| |
| handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */ |
| if (req->cb) { |
| uv__set_sys_error(loop, GetLastError()); |
| req->cb(req, -1); |
| } |
| |
| DECREASE_PENDING_REQ_COUNT(handle); |
| return; |
| } |
| } |
| |
| if (handle->flags & UV__HANDLE_CLOSING && |
| handle->reqs_pending == 0) { |
| assert(!(handle->flags & UV_HANDLE_CLOSED)); |
| |
| if (handle->flags & UV_HANDLE_CONNECTION) { |
| if (handle->pending_ipc_info.socket_info) { |
| free(handle->pending_ipc_info.socket_info); |
| handle->pending_ipc_info.socket_info = NULL; |
| } |
| |
| if (handle->flags & UV_HANDLE_EMULATE_IOCP) { |
| if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { |
| UnregisterWait(handle->read_req.wait_handle); |
| handle->read_req.wait_handle = INVALID_HANDLE_VALUE; |
| } |
| if (handle->read_req.event_handle) { |
| CloseHandle(handle->read_req.event_handle); |
| handle->read_req.event_handle = NULL; |
| } |
| } |
| } |
| |
| if (handle->flags & UV_HANDLE_PIPESERVER) { |
| assert(handle->accept_reqs); |
| free(handle->accept_reqs); |
| handle->accept_reqs = NULL; |
| } |
| |
| uv__handle_close(handle); |
| } |
| } |
| |
| |
| void uv_pipe_pending_instances(uv_pipe_t* handle, int count) { |
| handle->pending_instances = count; |
| handle->flags |= UV_HANDLE_PIPESERVER; |
| } |
| |
| |
| /* Creates a pipe server. */ |
| int uv_pipe_bind(uv_pipe_t* handle, const char* name) { |
| uv_loop_t* loop = handle->loop; |
| int i, errorno, nameSize; |
| uv_pipe_accept_t* req; |
| |
| if (handle->flags & UV_HANDLE_BOUND) { |
| uv__set_sys_error(loop, WSAEINVAL); |
| return -1; |
| } |
| |
| if (!name) { |
| uv__set_sys_error(loop, WSAEINVAL); |
| return -1; |
| } |
| |
| if (!(handle->flags & UV_HANDLE_PIPESERVER)) { |
| handle->pending_instances = default_pending_pipe_instances; |
| } |
| |
| handle->accept_reqs = (uv_pipe_accept_t*) |
| malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances); |
| if (!handle->accept_reqs) { |
| uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); |
| } |
| |
| for (i = 0; i < handle->pending_instances; i++) { |
| req = &handle->accept_reqs[i]; |
| uv_req_init(loop, (uv_req_t*) req); |
| req->type = UV_ACCEPT; |
| req->data = handle; |
| req->pipeHandle = INVALID_HANDLE_VALUE; |
| req->next_pending = NULL; |
| } |
| |
| /* Convert name to UTF16. */ |
| nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR); |
| handle->name = (WCHAR*)malloc(nameSize); |
| if (!handle->name) { |
| uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); |
| } |
| |
| if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) { |
| uv__set_sys_error(loop, GetLastError()); |
| return -1; |
| } |
| |
| /* |
| * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE. |
| * If this fails then there's already a pipe server for the given pipe name. |
| */ |
| handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name, |
| PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | |
| FILE_FLAG_FIRST_PIPE_INSTANCE, |
| PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, |
| PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL); |
| |
| if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) { |
| errorno = GetLastError(); |
| if (errorno == ERROR_ACCESS_DENIED) { |
| uv__set_error(loop, UV_EADDRINUSE, errorno); |
| } else if (errorno == ERROR_PATH_NOT_FOUND || errorno == ERROR_INVALID_NAME) { |
| uv__set_error(loop, UV_EACCES, errorno); |
| } else { |
| uv__set_sys_error(loop, errorno); |
| } |
| goto error; |
| } |
| |
| if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) { |
| uv__set_sys_error(loop, GetLastError()); |
| goto error; |
| } |
| |
| handle->pending_accepts = NULL; |
| handle->flags |= UV_HANDLE_PIPESERVER; |
| handle->flags |= UV_HANDLE_BOUND; |
| |
| return 0; |
| |
| error: |
| if (handle->name) { |
| free(handle->name); |
| handle->name = NULL; |
| } |
| |
| if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) { |
| CloseHandle(handle->accept_reqs[0].pipeHandle); |
| handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE; |
| } |
| |
| return -1; |
| } |
| |
| |
| static DWORD WINAPI pipe_connect_thread_proc(void* parameter) { |
| uv_loop_t* loop; |
| uv_pipe_t* handle; |
| uv_connect_t* req; |
| HANDLE pipeHandle = INVALID_HANDLE_VALUE; |
| DWORD duplex_flags; |
| |
| req = (uv_connect_t*) parameter; |
| assert(req); |
| handle = (uv_pipe_t*) req->handle; |
| assert(handle); |
| loop = handle->loop; |
| assert(loop); |
| |
| /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */ |
| /* We wait for the pipe to become available with WaitNamedPipe. */ |
| while (WaitNamedPipeW(handle->name, 30000)) { |
| /* The pipe is now available, try to connect. */ |
| pipeHandle = open_named_pipe(handle->name, &duplex_flags); |
| if (pipeHandle != INVALID_HANDLE_VALUE) { |
| break; |
| } |
| |
| SwitchToThread(); |
| } |
| |
| if (pipeHandle != INVALID_HANDLE_VALUE && |
| !uv_set_pipe_handle(loop, handle, pipeHandle, duplex_flags)) { |
| SET_REQ_SUCCESS(req); |
| } else { |
| SET_REQ_ERROR(req, GetLastError()); |
| } |
| |
| /* Post completed */ |
| POST_COMPLETION_FOR_REQ(loop, req); |
| |
| return 0; |
| } |
| |
| |
| void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, |
| const char* name, uv_connect_cb cb) { |
| uv_loop_t* loop = handle->loop; |
| int errorno, nameSize; |
| HANDLE pipeHandle = INVALID_HANDLE_VALUE; |
| DWORD duplex_flags; |
| |
| uv_req_init(loop, (uv_req_t*) req); |
| req->type = UV_CONNECT; |
| req->handle = (uv_stream_t*) handle; |
| req->cb = cb; |
| |
| /* Convert name to UTF16. */ |
| nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR); |
| handle->name = (WCHAR*)malloc(nameSize); |
| if (!handle->name) { |
| uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); |
| } |
| |
| if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) { |
| errorno = GetLastError(); |
| goto error; |
| } |
| |
| pipeHandle = open_named_pipe(handle->name, &duplex_flags); |
| if (pipeHandle == INVALID_HANDLE_VALUE) { |
| if (GetLastError() == ERROR_PIPE_BUSY) { |
| /* Wait for the server to make a pipe instance available. */ |
| if (!QueueUserWorkItem(&pipe_connect_thread_proc, |
| req, |
| WT_EXECUTELONGFUNCTION)) { |
| errorno = GetLastError(); |
| goto error; |
| } |
| |
| REGISTER_HANDLE_REQ(loop, handle, req); |
| handle->reqs_pending++; |
| |
| return; |
| } |
| |
| errorno = GetLastError(); |
| goto error; |
| } |
| |
| assert(pipeHandle != INVALID_HANDLE_VALUE); |
| |
| if (uv_set_pipe_handle(loop, |
| (uv_pipe_t*) req->handle, |
| pipeHandle, |
| duplex_flags)) { |
| errorno = GetLastError(); |
| goto error; |
| } |
| |
| SET_REQ_SUCCESS(req); |
| uv_insert_pending_req(loop, (uv_req_t*) req); |
| handle->reqs_pending++; |
| REGISTER_HANDLE_REQ(loop, handle, req); |
| return; |
| |
| error: |
| if (handle->name) { |
| free(handle->name); |
| handle->name = NULL; |
| } |
| |
| if (pipeHandle != INVALID_HANDLE_VALUE) { |
| CloseHandle(pipeHandle); |
| } |
| |
| /* Make this req pending reporting an error. */ |
| SET_REQ_ERROR(req, errorno); |
| uv_insert_pending_req(loop, (uv_req_t*) req); |
| handle->reqs_pending++; |
| REGISTER_HANDLE_REQ(loop, handle, req); |
| return; |
| } |
| |
| |
| /* Cleans up uv_pipe_t (server or connection) and all resources associated */ |
| /* with it. */ |
| void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) { |
| int i; |
| HANDLE pipeHandle; |
| |
| if (handle->name) { |
| free(handle->name); |
| handle->name = NULL; |
| } |
| |
| if (handle->flags & UV_HANDLE_PIPESERVER) { |
| for (i = 0; i < handle->pending_instances; i++) { |
| pipeHandle = handle->accept_reqs[i].pipeHandle; |
| if (pipeHandle != INVALID_HANDLE_VALUE) { |
| CloseHandle(pipeHandle); |
| handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE; |
| } |
| } |
| } |
| |
| if (handle->flags & UV_HANDLE_CONNECTION) { |
| handle->flags &= ~UV_HANDLE_WRITABLE; |
| eof_timer_destroy(handle); |
| } |
| |
| if ((handle->flags & UV_HANDLE_CONNECTION) |
| && handle->handle != INVALID_HANDLE_VALUE) { |
| CloseHandle(handle->handle); |
| handle->handle = INVALID_HANDLE_VALUE; |
| } |
| } |
| |
| |
| void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) { |
| if (handle->flags & UV_HANDLE_READING) { |
| handle->flags &= ~UV_HANDLE_READING; |
| DECREASE_ACTIVE_COUNT(loop, handle); |
| } |
| |
| if (handle->flags & UV_HANDLE_LISTENING) { |
| handle->flags &= ~UV_HANDLE_LISTENING; |
| DECREASE_ACTIVE_COUNT(loop, handle); |
| } |
| |
| uv_pipe_cleanup(loop, handle); |
| |
| if (handle->reqs_pending == 0) { |
| uv_want_endgame(loop, (uv_handle_t*) handle); |
| } |
| |
| handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); |
| uv__handle_closing(handle); |
| } |
| |
| |
| static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, |
| uv_pipe_accept_t* req, BOOL firstInstance) { |
| assert(handle->flags & UV_HANDLE_LISTENING); |
| |
| if (!firstInstance) { |
| assert(req->pipeHandle == INVALID_HANDLE_VALUE); |
| |
| req->pipeHandle = CreateNamedPipeW(handle->name, |
| PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, |
| PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, |
| PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL); |
| |
| if (req->pipeHandle == INVALID_HANDLE_VALUE) { |
| SET_REQ_ERROR(req, GetLastError()); |
| uv_insert_pending_req(loop, (uv_req_t*) req); |
| handle->reqs_pending++; |
| return; |
| } |
| |
| if (uv_set_pipe_handle(loop, handle, req->pipeHandle, 0)) { |
| CloseHandle(req->pipeHandle); |
| req->pipeHandle = INVALID_HANDLE_VALUE; |
| SET_REQ_ERROR(req, GetLastError()); |
| uv_insert_pending_req(loop, (uv_req_t*) req); |
| handle->reqs_pending++; |
| return; |
| } |
| } |
| |
| assert(req->pipeHandle != INVALID_HANDLE_VALUE); |
| |
| /* Prepare the overlapped structure. */ |
| memset(&(req->overlapped), 0, sizeof(req->overlapped)); |
| |
| if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) && |
| GetLastError() != ERROR_IO_PENDING) { |
| if (GetLastError() == ERROR_PIPE_CONNECTED) { |
| SET_REQ_SUCCESS(req); |
| } else { |
| CloseHandle(req->pipeHandle); |
| req->pipeHandle = INVALID_HANDLE_VALUE; |
| /* Make this req pending reporting an error. */ |
| SET_REQ_ERROR(req, GetLastError()); |
| } |
| uv_insert_pending_req(loop, (uv_req_t*) req); |
| handle->reqs_pending++; |
| return; |
| } |
| |
| handle->reqs_pending++; |
| } |
| |
| |
| int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { |
| uv_loop_t* loop = server->loop; |
| uv_pipe_t* pipe_client; |
| uv_pipe_accept_t* req; |
| |
| if (server->ipc) { |
| if (!server->pending_ipc_info.socket_info) { |
| /* No valid pending sockets. */ |
| uv__set_sys_error(loop, WSAEWOULDBLOCK); |
| return -1; |
| } |
| |
| return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info, |
| server->pending_ipc_info.tcp_connection); |
| } else { |
| pipe_client = (uv_pipe_t*)client; |
| |
| /* Find a connection instance that has been connected, but not yet */ |
| /* accepted. */ |
| req = server->pending_accepts; |
| |
| if (!req) { |
| /* No valid connections found, so we error out. */ |
| uv__set_sys_error(loop, WSAEWOULDBLOCK); |
| return -1; |
| } |
| |
| /* Initialize the client handle and copy the pipeHandle to the client */ |
| uv_pipe_connection_init(pipe_client); |
| pipe_client->handle = req->pipeHandle; |
| pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; |
| |
| /* Prepare the req to pick up a new connection */ |
| server->pending_accepts = req->next_pending; |
| req->next_pending = NULL; |
| req->pipeHandle = INVALID_HANDLE_VALUE; |
| |
| if (!(server->flags & UV__HANDLE_CLOSING)) { |
| uv_pipe_queue_accept(loop, server, req, FALSE); |
| } |
| } |
| |
| return 0; |
| } |
| |
| |
| /* Starts listening for connections for the given pipe. */ |
| int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { |
| uv_loop_t* loop = handle->loop; |
| int i; |
| |
| if (handle->flags & UV_HANDLE_LISTENING) { |
| handle->connection_cb = cb; |
| } |
| |
| if (!(handle->flags & UV_HANDLE_BOUND)) { |
| uv__set_artificial_error(loop, UV_EINVAL); |
| return -1; |
| } |
| |
| if (handle->flags & UV_HANDLE_READING) { |
| uv__set_artificial_error(loop, UV_EISCONN); |
| return -1; |
| } |
| |
| if (!(handle->flags & UV_HANDLE_PIPESERVER)) { |
| uv__set_artificial_error(loop, UV_ENOTSUP); |
| return -1; |
| } |
| |
| handle->flags |= UV_HANDLE_LISTENING; |
| INCREASE_ACTIVE_COUNT(loop, handle); |
| handle->connection_cb = cb; |
| |
| /* First pipe handle should have already been created in uv_pipe_bind */ |
| assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); |
| |
| for (i = 0; i < handle->pending_instances; i++) { |
| uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); |
| } |
| |
| return 0; |
| } |
| |
| |
| static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) { |
| int result; |
| DWORD bytes; |
| uv_read_t* req = (uv_read_t*) parameter; |
| uv_pipe_t* handle = (uv_pipe_t*) req->data; |
| uv_loop_t* loop = handle->loop; |
| |
| assert(req != NULL); |
| assert(req->type == UV_READ); |
| assert(handle->type == UV_NAMED_PIPE); |
| |
| result = ReadFile(handle->handle, |
| &uv_zero_, |
| 0, |
| &bytes, |
| NULL); |
| |
| if (!result) { |
| SET_REQ_ERROR(req, GetLastError()); |
| } |
| |
| POST_COMPLETION_FOR_REQ(loop, req); |
| return 0; |
| } |
| |
| |
| static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) { |
| int result; |
| DWORD bytes; |
| uv_write_t* req = (uv_write_t*) parameter; |
| uv_pipe_t* handle = (uv_pipe_t*) req->handle; |
| uv_loop_t* loop = handle->loop; |
| |
| assert(req != NULL); |
| assert(req->type == UV_WRITE); |
| assert(handle->type == UV_NAMED_PIPE); |
| assert(req->write_buffer.base); |
| |
| result = WriteFile(handle->handle, |
| req->write_buffer.base, |
| req->write_buffer.len, |
| &bytes, |
| NULL); |
| |
| if (!result) { |
| SET_REQ_ERROR(req, GetLastError()); |
| } |
| |
| POST_COMPLETION_FOR_REQ(loop, req); |
| return 0; |
| } |
| |
| |
| static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) { |
| uv_read_t* req; |
| uv_tcp_t* handle; |
| |
| req = (uv_read_t*) context; |
| assert(req != NULL); |
| handle = (uv_tcp_t*)req->data; |
| assert(handle != NULL); |
| assert(!timed_out); |
| |
| if (!PostQueuedCompletionStatus(handle->loop->iocp, |
| req->overlapped.InternalHigh, |
| 0, |
| &req->overlapped)) { |
| uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); |
| } |
| } |
| |
| |
| static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) { |
| uv_write_t* req; |
| uv_tcp_t* handle; |
| |
| req = (uv_write_t*) context; |
| assert(req != NULL); |
| handle = (uv_tcp_t*)req->handle; |
| assert(handle != NULL); |
| assert(!timed_out); |
| |
| if (!PostQueuedCompletionStatus(handle->loop->iocp, |
| req->overlapped.InternalHigh, |
| 0, |
| &req->overlapped)) { |
| uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); |
| } |
| } |
| |
| |
| static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { |
| uv_read_t* req; |
| int result; |
| |
| assert(handle->flags & UV_HANDLE_READING); |
| assert(!(handle->flags & UV_HANDLE_READ_PENDING)); |
| |
| assert(handle->handle != INVALID_HANDLE_VALUE); |
| |
| req = &handle->read_req; |
| |
| if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { |
| if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc, |
| req, |
| WT_EXECUTELONGFUNCTION)) { |
| /* Make this req pending reporting an error. */ |
| SET_REQ_ERROR(req, GetLastError()); |
| goto error; |
| } |
| } else { |
| memset(&req->overlapped, 0, sizeof(req->overlapped)); |
| if (handle->flags & UV_HANDLE_EMULATE_IOCP) { |
| req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); |
| } |
| |
| /* Do 0-read */ |
| result = ReadFile(handle->handle, |
| &uv_zero_, |
| 0, |
| NULL, |
| &req->overlapped); |
| |
| if (!result && GetLastError() != ERROR_IO_PENDING) { |
| /* Make this req pending reporting an error. */ |
| SET_REQ_ERROR(req, GetLastError()); |
| goto error; |
| } |
| |
| if (handle->flags & UV_HANDLE_EMULATE_IOCP) { |
| if (!req->event_handle) { |
| req->event_handle = CreateEvent(NULL, 0, 0, NULL); |
| if (!req->event_handle) { |
| uv_fatal_error(GetLastError(), "CreateEvent"); |
| } |
| } |
| if (req->wait_handle == INVALID_HANDLE_VALUE) { |
| if (!RegisterWaitForSingleObject(&req->wait_handle, |
| req->overlapped.hEvent, post_completion_read_wait, (void*) req, |
| INFINITE, WT_EXECUTEINWAITTHREAD)) { |
| SET_REQ_ERROR(req, GetLastError()); |
| goto error; |
| } |
| } |
| } |
| } |
| |
| /* Start the eof timer if there is one */ |
| eof_timer_start(handle); |
| handle->flags |= UV_HANDLE_READ_PENDING; |
| handle->reqs_pending++; |
| return; |
| |
| error: |
| uv_insert_pending_req(loop, (uv_req_t*)req); |
| handle->flags |= UV_HANDLE_READ_PENDING; |
| handle->reqs_pending++; |
| } |
| |
| |
| static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb, |
| uv_read_cb read_cb, uv_read2_cb read2_cb) { |
| uv_loop_t* loop = handle->loop; |
| |
| handle->flags |= UV_HANDLE_READING; |
| INCREASE_ACTIVE_COUNT(loop, handle); |
| handle->read_cb = read_cb; |
| handle->read2_cb = read2_cb; |
| handle->alloc_cb = alloc_cb; |
| |
| /* If reading was stopped and then started again, there could still be a */ |
| /* read request pending. */ |
| if (!(handle->flags & UV_HANDLE_READ_PENDING)) |
| uv_pipe_queue_read(loop, handle); |
| |
| return 0; |
| } |
| |
| |
| int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, |
| uv_read_cb read_cb) { |
| return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL); |
| } |
| |
| |
| int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, |
| uv_read2_cb read_cb) { |
| return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb); |
| } |
| |
| |
| static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle, |
| uv_write_t* req) { |
| req->next_req = NULL; |
| if (handle->non_overlapped_writes_tail) { |
| req->next_req = |
| handle->non_overlapped_writes_tail->next_req; |
| handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req; |
| handle->non_overlapped_writes_tail = req; |
| } else { |
| req->next_req = (uv_req_t*)req; |
| handle->non_overlapped_writes_tail = req; |
| } |
| } |
| |
| |
| static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) { |
| uv_write_t* req; |
| |
| if (handle->non_overlapped_writes_tail) { |
| req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req; |
| |
| if (req == handle->non_overlapped_writes_tail) { |
| handle->non_overlapped_writes_tail = NULL; |
| } else { |
| handle->non_overlapped_writes_tail->next_req = |
| req->next_req; |
| } |
| |
| return req; |
| } else { |
| /* queue empty */ |
| return NULL; |
| } |
| } |
| |
| |
| static void uv_queue_non_overlapped_write(uv_pipe_t* handle) { |
| uv_write_t* req = uv_remove_non_overlapped_write_req(handle); |
| if (req) { |
| if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc, |
| req, |
| WT_EXECUTELONGFUNCTION)) { |
| uv_fatal_error(GetLastError(), "QueueUserWorkItem"); |
| } |
| } |
| } |
| |
| |
| static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, |
| uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, |
| uv_stream_t* send_handle, uv_write_cb cb) { |
| int result; |
| uv_tcp_t* tcp_send_handle; |
| uv_write_t* ipc_header_req; |
| uv_ipc_frame_uv_stream ipc_frame; |
| |
| if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) { |
| uv__set_artificial_error(loop, UV_ENOTSUP); |
| return -1; |
| } |
| |
| /* Only TCP handles are supported for sharing. */ |
| if (send_handle && ((send_handle->type != UV_TCP) || |
| (!(send_handle->flags & UV_HANDLE_BOUND) && |
| !(send_handle->flags & UV_HANDLE_CONNECTION)))) { |
| uv__set_artificial_error(loop, UV_ENOTSUP); |
| return -1; |
| } |
| |
| assert(handle->handle != INVALID_HANDLE_VALUE); |
| |
| uv_req_init(loop, (uv_req_t*) req); |
| req->type = UV_WRITE; |
| req->handle = (uv_stream_t*) handle; |
| req->cb = cb; |
| req->ipc_header = 0; |
| req->event_handle = NULL; |
| req->wait_handle = INVALID_HANDLE_VALUE; |
| memset(&req->overlapped, 0, sizeof(req->overlapped)); |
| |
| if (handle->ipc) { |
| assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); |
| ipc_frame.header.flags = 0; |
| |
| /* Use the IPC framing protocol. */ |
| if (send_handle) { |
| tcp_send_handle = (uv_tcp_t*)send_handle; |
| |
| if (uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid, |
| &ipc_frame.socket_info)) { |
| return -1; |
| } |
| ipc_frame.header.flags |= UV_IPC_TCP_SERVER; |
| |
| if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) { |
| ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION; |
| } |
| } |
| |
| if (bufcnt == 1) { |
| ipc_frame.header.flags |= UV_IPC_RAW_DATA; |
| ipc_frame.header.raw_data_length = bufs[0].len; |
| } |
| |
| /* |
| * Use the provided req if we're only doing a single write. |
| * If we're doing multiple writes, use ipc_header_write_req to do |
| * the first write, and then use the provided req for the second write. |
| */ |
| if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { |
| ipc_header_req = req; |
| } else { |
| /* |
| * Try to use the preallocated write req if it's available. |
| * Otherwise allocate a new one. |
| */ |
| if (handle->ipc_header_write_req.type != UV_WRITE) { |
| ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req; |
| } else { |
| ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t)); |
| if (!ipc_header_req) { |
| uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); |
| } |
| } |
| |
| uv_req_init(loop, (uv_req_t*) ipc_header_req); |
| ipc_header_req->type = UV_WRITE; |
| ipc_header_req->handle = (uv_stream_t*) handle; |
| ipc_header_req->cb = NULL; |
| ipc_header_req->ipc_header = 1; |
| } |
| |
| /* Write the header or the whole frame. */ |
| memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped)); |
| |
| result = WriteFile(handle->handle, |
| &ipc_frame, |
| ipc_frame.header.flags & UV_IPC_TCP_SERVER ? |
| sizeof(ipc_frame) : sizeof(ipc_frame.header), |
| NULL, |
| &ipc_header_req->overlapped); |
| if (!result && GetLastError() != ERROR_IO_PENDING) { |
| uv__set_sys_error(loop, GetLastError()); |
| return -1; |
| } |
| |
| if (result) { |
| /* Request completed immediately. */ |
| ipc_header_req->queued_bytes = 0; |
| } else { |
| /* Request queued by the kernel. */ |
| ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ? |
| sizeof(ipc_frame) : sizeof(ipc_frame.header); |
| handle->write_queue_size += ipc_header_req->queued_bytes; |
| } |
| |
| REGISTER_HANDLE_REQ(loop, handle, ipc_header_req); |
| handle->reqs_pending++; |
| handle->write_reqs_pending++; |
| |
| /* If we don't have any raw data to write - we're done. */ |
| if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { |
| return 0; |
| } |
| } |
| |
| if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { |
| req->write_buffer = bufs[0]; |
| uv_insert_non_overlapped_write_req(handle, req); |
| if (handle->write_reqs_pending == 0) { |
| uv_queue_non_overlapped_write(handle); |
| } |
| |
| /* Request queued by the kernel. */ |
| req->queued_bytes = uv_count_bufs(bufs, bufcnt); |
| handle->write_queue_size += req->queued_bytes; |
| } else { |
| result = WriteFile(handle->handle, |
| bufs[0].base, |
| bufs[0].len, |
| NULL, |
| &req->overlapped); |
| |
| if (!result && GetLastError() != ERROR_IO_PENDING) { |
| uv__set_sys_error(loop, GetLastError()); |
| return -1; |
| } |
| |
| if (result) { |
| /* Request completed immediately. */ |
| req->queued_bytes = 0; |
| } else { |
| /* Request queued by the kernel. */ |
| req->queued_bytes = uv_count_bufs(bufs, bufcnt); |
| handle->write_queue_size += req->queued_bytes; |
| } |
| |
| if (handle->flags & UV_HANDLE_EMULATE_IOCP) { |
| req->event_handle = CreateEvent(NULL, 0, 0, NULL); |
| if (!req->event_handle) { |
| uv_fatal_error(GetLastError(), "CreateEvent"); |
| } |
| if (!RegisterWaitForSingleObject(&req->wait_handle, |
| req->overlapped.hEvent, post_completion_write_wait, (void*) req, |
| INFINITE, WT_EXECUTEINWAITTHREAD)) { |
| uv__set_sys_error(loop, GetLastError()); |
| return -1; |
| } |
| } |
| } |
| |
| REGISTER_HANDLE_REQ(loop, handle, req); |
| handle->reqs_pending++; |
| handle->write_reqs_pending++; |
| |
| return 0; |
| } |
| |
| |
| int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, |
| uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { |
| return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb); |
| } |
| |
| |
| int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, |
| uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { |
| if (!handle->ipc) { |
| uv__set_artificial_error(loop, UV_EINVAL); |
| return -1; |
| } |
| |
| return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb); |
| } |
| |
| |
| static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, |
| uv_buf_t buf) { |
| /* If there is an eof timer running, we don't need it any more, */ |
| /* so discard it. */ |
| eof_timer_destroy(handle); |
| |
| handle->flags &= ~UV_HANDLE_READABLE; |
| uv_read_stop((uv_stream_t*) handle); |
| |
| uv__set_artificial_error(loop, UV_EOF); |
| if (handle->read2_cb) { |
| handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE); |
| } else { |
| handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); |
| } |
| } |
| |
| |
| static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, |
| uv_buf_t buf) { |
| /* If there is an eof timer running, we don't need it any more, */ |
| /* so discard it. */ |
| eof_timer_destroy(handle); |
| |
| uv_read_stop((uv_stream_t*) handle); |
| |
| uv__set_sys_error(loop, error); |
| if (handle->read2_cb) { |
| handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE); |
| } else { |
| handle->read_cb((uv_stream_t*)handle, -1, buf); |
| } |
| } |
| |
| |
| static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, |
| int error, uv_buf_t buf) { |
| if (error == ERROR_BROKEN_PIPE) { |
| uv_pipe_read_eof(loop, handle, buf); |
| } else { |
| uv_pipe_read_error(loop, handle, error, buf); |
| } |
| } |
| |
| |
| void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, |
| uv_req_t* req) { |
| DWORD bytes, avail; |
| uv_buf_t buf; |
| uv_ipc_frame_uv_stream ipc_frame; |
| |
| assert(handle->type == UV_NAMED_PIPE); |
| |
| handle->flags &= ~UV_HANDLE_READ_PENDING; |
| eof_timer_stop(handle); |
| |
| if (!REQ_SUCCESS(req)) { |
| /* An error occurred doing the 0-read. */ |
| if (handle->flags & UV_HANDLE_READING) { |
| uv_pipe_read_error_or_eof(loop, |
| handle, |
| GET_REQ_ERROR(req), |
| uv_null_buf_); |
| } |
| } else { |
| /* Do non-blocking reads until the buffer is empty */ |
| while (handle->flags & UV_HANDLE_READING) { |
| if (!PeekNamedPipe(handle->handle, |
| NULL, |
| 0, |
| NULL, |
| &avail, |
| NULL)) { |
| uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); |
| break; |
| } |
| |
| if (avail == 0) { |
| /* There is nothing to read after all. */ |
| break; |
| } |
| |
| if (handle->ipc) { |
| /* Use the IPC framing protocol to read the incoming data. */ |
| if (handle->remaining_ipc_rawdata_bytes == 0) { |
| /* We're reading a new frame. First, read the header. */ |
| assert(avail >= sizeof(ipc_frame.header)); |
| |
| if (!ReadFile(handle->handle, |
| &ipc_frame.header, |
| sizeof(ipc_frame.header), |
| &bytes, |
| NULL)) { |
| uv_pipe_read_error_or_eof(loop, handle, GetLastError(), |
| uv_null_buf_); |
| break; |
| } |
| |
| assert(bytes == sizeof(ipc_frame.header)); |
| assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA | |
| UV_IPC_TCP_CONNECTION)); |
| |
| if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) { |
| assert(avail - sizeof(ipc_frame.header) >= |
| sizeof(ipc_frame.socket_info)); |
| |
| /* Read the TCP socket info. */ |
| if (!ReadFile(handle->handle, |
| &ipc_frame.socket_info, |
| sizeof(ipc_frame) - sizeof(ipc_frame.header), |
| &bytes, |
| NULL)) { |
| uv_pipe_read_error_or_eof(loop, handle, GetLastError(), |
| uv_null_buf_); |
| break; |
| } |
| |
| assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); |
| |
| /* Store the pending socket info. */ |
| assert(!handle->pending_ipc_info.socket_info); |
| handle->pending_ipc_info.socket_info = |
| (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info))); |
| if (!handle->pending_ipc_info.socket_info) { |
| uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); |
| } |
| |
| *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info; |
| handle->pending_ipc_info.tcp_connection = |
| ipc_frame.header.flags & UV_IPC_TCP_CONNECTION; |
| } |
| |
| if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { |
| handle->remaining_ipc_rawdata_bytes = |
| ipc_frame.header.raw_data_length; |
| continue; |
| } |
| } else { |
| avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes); |
| } |
| } |
| |
| buf = handle->alloc_cb((uv_handle_t*) handle, avail); |
| assert(buf.len > 0); |
| |
| if (ReadFile(handle->handle, |
| buf.base, |
| buf.len, |
| &bytes, |
| NULL)) { |
| /* Successful read */ |
| if (handle->ipc) { |
| assert(handle->remaining_ipc_rawdata_bytes >= bytes); |
| handle->remaining_ipc_rawdata_bytes = |
| handle->remaining_ipc_rawdata_bytes - bytes; |
| if (handle->read2_cb) { |
| handle->read2_cb(handle, bytes, buf, |
| handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE); |
| } else if (handle->read_cb) { |
| handle->read_cb((uv_stream_t*)handle, bytes, buf); |
| } |
| |
| if (handle->pending_ipc_info.socket_info) { |
| free(handle->pending_ipc_info.socket_info); |
| handle->pending_ipc_info.socket_info = NULL; |
| } |
| } else { |
| handle->read_cb((uv_stream_t*)handle, bytes, buf); |
| } |
| |
| /* Read again only if bytes == buf.len */ |
| if (bytes <= buf.len) { |
| break; |
| } |
| } else { |
| uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); |
| break; |
| } |
| } |
| |
| /* Post another 0-read if still reading and not closing. */ |
| if ((handle->flags & UV_HANDLE_READING) && |
| !(handle->flags & UV_HANDLE_READ_PENDING)) { |
| uv_pipe_queue_read(loop, handle); |
| } |
| } |
| |
| DECREASE_PENDING_REQ_COUNT(handle); |
| } |
| |
| |
| void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, |
| uv_write_t* req) { |
| assert(handle->type == UV_NAMED_PIPE); |
| |
| assert(handle->write_queue_size >= req->queued_bytes); |
| handle->write_queue_size -= req->queued_bytes; |
| |
| UNREGISTER_HANDLE_REQ(loop, handle, req); |
| |
| if (handle->flags & UV_HANDLE_EMULATE_IOCP) { |
| if (req->wait_handle != INVALID_HANDLE_VALUE) { |
| UnregisterWait(req->wait_handle); |
| req->wait_handle = INVALID_HANDLE_VALUE; |
| } |
| if (req->event_handle) { |
| CloseHandle(req->event_handle); |
| req->event_handle = NULL; |
| } |
| } |
| |
| if (req->ipc_header) { |
| if (req == &handle->ipc_header_write_req) { |
| req->type = UV_UNKNOWN_REQ; |
| } else { |
| free(req); |
| } |
| } else { |
| if (req->cb) { |
| if (!REQ_SUCCESS(req)) { |
| uv__set_sys_error(loop, GET_REQ_ERROR(req)); |
| ((uv_write_cb)req->cb)(req, -1); |
| } else { |
| ((uv_write_cb)req->cb)(req, 0); |
| } |
| } |
| } |
| |
| handle->write_reqs_pending--; |
| |
| if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && |
| handle->non_overlapped_writes_tail) { |
| assert(handle->write_reqs_pending > 0); |
| uv_queue_non_overlapped_write(handle); |
| } |
| |
| if (handle->shutdown_req != NULL && |
| handle->write_reqs_pending == 0) { |
| uv_want_endgame(loop, (uv_handle_t*)handle); |
| } |
| |
| DECREASE_PENDING_REQ_COUNT(handle); |
| } |
| |
| |
| void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, |
| uv_req_t* raw_req) { |
| uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req; |
| |
| assert(handle->type == UV_NAMED_PIPE); |
| |
| if (REQ_SUCCESS(req)) { |
| assert(req->pipeHandle != INVALID_HANDLE_VALUE); |
| req->next_pending = handle->pending_accepts; |
| handle->pending_accepts = req; |
| |
| if (handle->connection_cb) { |
| handle->connection_cb((uv_stream_t*)handle, 0); |
| } |
| } else { |
| if (req->pipeHandle != INVALID_HANDLE_VALUE) { |
| CloseHandle(req->pipeHandle); |
| req->pipeHandle = INVALID_HANDLE_VALUE; |
| } |
| if (!(handle->flags & UV__HANDLE_CLOSING)) { |
| uv_pipe_queue_accept(loop, handle, req, FALSE); |
| } |
| } |
| |
| DECREASE_PENDING_REQ_COUNT(handle); |
| } |
| |
| |
| void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle, |
| uv_connect_t* req) { |
| assert(handle->type == UV_NAMED_PIPE); |
| |
| UNREGISTER_HANDLE_REQ(loop, handle, req); |
| |
| if (req->cb) { |
| if (REQ_SUCCESS(req)) { |
| uv_pipe_connection_init(handle); |
| ((uv_connect_cb)req->cb)(req, 0); |
| } else { |
| uv__set_sys_error(loop, GET_REQ_ERROR(req)); |
| ((uv_connect_cb)req->cb)(req, -1); |
| } |
| } |
| |
| DECREASE_PENDING_REQ_COUNT(handle); |
| } |
| |
| |
| void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle, |
| uv_shutdown_t* req) { |
| assert(handle->type == UV_NAMED_PIPE); |
| |
| UNREGISTER_HANDLE_REQ(loop, handle, req); |
| |
| if (handle->flags & UV_HANDLE_READABLE) { |
| /* Initialize and optionally start the eof timer. Only do this if the */ |
| /* pipe is readable and we haven't seen EOF come in ourselves. */ |
| eof_timer_init(handle); |
| |
| /* If reading start the timer right now. */ |
| /* Otherwise uv_pipe_queue_read will start it. */ |
| if (handle->flags & UV_HANDLE_READ_PENDING) { |
| eof_timer_start(handle); |
| } |
| |
| } else { |
| /* This pipe is not readable. We can just close it to let the other end */ |
| /* know that we're done writing. */ |
| CloseHandle(handle->handle); |
| handle->handle = INVALID_HANDLE_VALUE; |
| } |
| |
| if (req->cb) { |
| req->cb(req, 0); |
| } |
| |
| DECREASE_PENDING_REQ_COUNT(handle); |
| } |
| |
| |
| static void eof_timer_init(uv_pipe_t* pipe) { |
| int r; |
| |
| assert(pipe->eof_timer == NULL); |
| assert(pipe->flags & UV_HANDLE_CONNECTION); |
| |
| pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer); |
| |
| r = uv_timer_init(pipe->loop, pipe->eof_timer); |
| assert(r == 0); /* timers can't fail */ |
| pipe->eof_timer->data = pipe; |
| uv_unref((uv_handle_t*) pipe->eof_timer); |
| } |
| |
| |
| static void eof_timer_start(uv_pipe_t* pipe) { |
| assert(pipe->flags & UV_HANDLE_CONNECTION); |
| |
| if (pipe->eof_timer != NULL) { |
| uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0); |
| } |
| } |
| |
| |
| static void eof_timer_stop(uv_pipe_t* pipe) { |
| assert(pipe->flags & UV_HANDLE_CONNECTION); |
| |
| if (pipe->eof_timer != NULL) { |
| uv_timer_stop(pipe->eof_timer); |
| } |
| } |
| |
| |
| static void eof_timer_cb(uv_timer_t* timer, int status) { |
| uv_pipe_t* pipe = (uv_pipe_t*) timer->data; |
| uv_loop_t* loop = timer->loop; |
| |
| assert(status == 0); /* timers can't fail */ |
| assert(pipe->type == UV_NAMED_PIPE); |
| |
| /* This should always be true, since we start the timer only */ |
| /* in uv_pipe_queue_read after successfully calling ReadFile, */ |
| /* or in uv_process_pipe_shutdown_req if a read is pending, */ |
| /* and we always immediately stop the timer in */ |
| /* uv_process_pipe_read_req. */ |
| assert(pipe->flags & UV_HANDLE_READ_PENDING); |
| |
| /* If there are many packets coming off the iocp then the timer callback */ |
| /* may be called before the read request is coming off the queue. */ |
| /* Therefore we check here if the read request has completed but will */ |
| /* be processed later. */ |
| if ((pipe->flags & UV_HANDLE_READ_PENDING) && |
| HasOverlappedIoCompleted(&pipe->read_req.overlapped)) { |
| return; |
| } |
| |
| /* Force both ends off the pipe. */ |
| CloseHandle(pipe->handle); |
| pipe->handle = INVALID_HANDLE_VALUE; |
| |
| /* Stop reading, so the pending read that is going to fail will */ |
| /* not be reported to the user. */ |
| uv_read_stop((uv_stream_t*) pipe); |
| |
| /* Report the eof and update flags. This will get reported even if the */ |
| /* user stopped reading in the meantime. TODO: is that okay? */ |
| uv_pipe_read_eof(loop, pipe, uv_null_buf_); |
| } |
| |
| |
| static void eof_timer_destroy(uv_pipe_t* pipe) { |
| assert(pipe->flags && UV_HANDLE_CONNECTION); |
| |
| if (pipe->eof_timer) { |
| uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb); |
| pipe->eof_timer = NULL; |
| } |
| } |
| |
| |
| static void eof_timer_close_cb(uv_handle_t* handle) { |
| assert(handle->type == UV_TIMER); |
| free(handle); |
| } |
| |
| |
| int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { |
| HANDLE os_handle = (HANDLE)_get_osfhandle(file); |
| |
| if (os_handle == INVALID_HANDLE_VALUE || |
| uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) { |
| uv__set_sys_error(pipe->loop, WSAEINVAL); |
| return -1; |
| } |
| |
| uv_pipe_connection_init(pipe); |
| pipe->handle = os_handle; |
| pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; |
| |
| if (pipe->ipc) { |
| assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); |
| pipe->ipc_pid = uv_parent_pid(); |
| assert(pipe->ipc_pid != -1); |
| } |
| return 0; |
| } |