-
-
Notifications
You must be signed in to change notification settings - Fork 35
moving from readable-streams to streamx #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,8 +13,20 @@ | |
| return NULL; \ | ||
| } | ||
|
|
||
| #define NAPI_MAKE_CALLBACK_AND_ALLOC(env, nil, ctx, cb, n, argv, res, nread) \ | ||
| if (napi_make_callback(env, nil, ctx, cb, n, argv, &res) == napi_pending_exception) { \ | ||
| #define NAPI_MAKE_CALLBACK_AND_ALLOC(env, nil, ctx, cb, n, argv, nread) \ | ||
| napi_value res; \ | ||
| napi_status stat = napi_make_callback(env, nil, ctx, cb, n, argv, &res); \ | ||
| if (stat == napi_ok) { \ | ||
| bool is_buf; \ | ||
| napi_is_buffer(env, res, &is_buf); \ | ||
| if (is_buf) { \ | ||
| UTP_NAPI_BUFFER_ALLOC(self, res, nread) \ | ||
| } else { \ | ||
| size_t size = nread <= 0 ? 0 : nread; \ | ||
| self->buf.base += size; \ | ||
| self->buf.len -= size; \ | ||
| } \ | ||
| } else if (stat == napi_pending_exception) { \ | ||
| napi_value fatal_exception; \ | ||
| napi_get_and_clear_last_exception(env, &fatal_exception); \ | ||
| napi_fatal_exception(env, fatal_exception); \ | ||
|
|
@@ -25,7 +37,7 @@ | |
| }) \ | ||
| } \ | ||
| } else { \ | ||
| UTP_NAPI_BUFFER_ALLOC(self, res, nread) \ | ||
| printf("[UTP-NATIVE]: Unexpected result of callback %i\n", stat); \ | ||
| } | ||
|
|
||
| #define UTP_NAPI_CALLBACK(fn, src) \ | ||
|
|
@@ -43,14 +55,8 @@ | |
| char *buf; \ | ||
| size_t buf_len; \ | ||
| napi_get_buffer_info(env, ret, (void **) &buf, &buf_len); \ | ||
| if (buf_len == 0) { \ | ||
| size_t size = nread <= 0 ? 0 : nread; \ | ||
| self->buf.base += size; \ | ||
| self->buf.len -= size; \ | ||
| } else { \ | ||
| self->buf.base = buf; \ | ||
| self->buf.len = buf_len; \ | ||
| } | ||
| self->buf.base = buf; \ | ||
| self->buf.len = buf_len; \ | ||
|
|
||
| typedef struct { | ||
| uint32_t min_recv_packet_size; | ||
|
|
@@ -71,6 +77,7 @@ typedef struct { | |
| napi_ref on_close; | ||
| napi_ref on_connect; | ||
| napi_ref realloc; | ||
| bool destroyed; | ||
| } utp_napi_connection_t; | ||
|
|
||
| typedef struct { | ||
|
|
@@ -175,12 +182,11 @@ on_uv_read (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct s | |
| utp_napi_parse_address((struct sockaddr *) addr, ip, &port); | ||
|
|
||
| UTP_NAPI_CALLBACK(self->on_message, { | ||
| napi_value ret; | ||
| napi_value argv[3]; | ||
| napi_create_int32(env, nread, &(argv[0])); | ||
| napi_create_uint32(env, port, &(argv[1])); | ||
| napi_create_string_utf8(env, ip, NAPI_AUTO_LENGTH, &(argv[2])); | ||
| NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 3, argv, ret, nread) | ||
| NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 3, argv, nread) | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -189,7 +195,12 @@ on_uv_close (uv_handle_t *handle) { | |
| utp_napi_t *self = (utp_napi_t *) handle->data; | ||
|
|
||
| self->pending_close--; | ||
| if (self->pending_close > 0) return; | ||
| if (self->pending_close == 1) { | ||
| uv_close((uv_handle_t *) &(self->handle), on_uv_close); | ||
| } | ||
| if (self->pending_close > 0) { | ||
| return; | ||
| } | ||
|
|
||
| UTP_NAPI_CALLBACK(self->on_close, { | ||
| NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 0, NULL, NULL); | ||
|
|
@@ -219,13 +230,20 @@ on_utp_firewall (utp_callback_arguments *a) { | |
|
|
||
| inline static void | ||
| utp_napi_connection_destroy (utp_napi_connection_t *self) { | ||
| if (self->destroyed) { | ||
| return; | ||
| } | ||
| if (self->buf.base == NULL) { | ||
| return; | ||
| } | ||
| UTP_NAPI_CALLBACK(self->on_close, { | ||
| NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 0, NULL, NULL) | ||
| }) | ||
|
|
||
| self->env = env; | ||
| self->buf.base = NULL; | ||
| self->buf.len = 0; | ||
| self->destroyed = true; | ||
|
|
||
| napi_delete_reference(self->env, self->ctx); | ||
| napi_delete_reference(self->env, self->on_read); | ||
|
|
@@ -258,12 +276,14 @@ on_utp_state_change (utp_callback_arguments *a) { | |
| } | ||
|
|
||
| case UTP_STATE_EOF: { | ||
| if (self->destroyed) { | ||
| return 0; | ||
| } | ||
| if (self->recv_packet_size) { | ||
| UTP_NAPI_CALLBACK(self->on_read, { | ||
| napi_value ret; | ||
| napi_value argv[1]; | ||
| napi_create_uint32(env, self->recv_packet_size, &(argv[0])); | ||
| NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, ret, self->recv_packet_size) | ||
| NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, self->recv_packet_size) | ||
| self->recv_packet_size = 0; | ||
| }) | ||
| } | ||
|
|
@@ -342,10 +362,9 @@ on_utp_read (utp_callback_arguments *a) { | |
| } | ||
|
|
||
| UTP_NAPI_CALLBACK(self->on_read, { | ||
| napi_value ret; | ||
| napi_value argv[1]; | ||
| napi_create_uint32(env, self->recv_packet_size, &(argv[0])); | ||
| NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, ret, self->recv_packet_size) | ||
| NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, self->recv_packet_size) | ||
| self->recv_packet_size = 0; | ||
| }) | ||
|
|
||
|
|
@@ -425,7 +444,6 @@ NAPI_METHOD(utp_napi_close) { | |
| err = uv_udp_recv_stop(&(self->handle)); | ||
| if (err < 0) UTP_NAPI_THROW(err) | ||
|
|
||
| uv_close((uv_handle_t *) &(self->handle), on_uv_close); | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This handle is now closed serially. If I leave his in parallel, it causes a segfault. Probably a timing error, but I couldn't figure out its cause. |
||
| uv_close((uv_handle_t *) &(self->timer), on_uv_close); | ||
|
|
||
| return NULL; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,8 +5,6 @@ const events = require('events') | |
| const dns = require('dns') | ||
| const set = require('unordered-set') | ||
|
|
||
| const EMPTY = Buffer.alloc(0) | ||
|
|
||
| module.exports = UTP | ||
|
|
||
| function UTP (opts) { | ||
|
|
@@ -132,16 +130,17 @@ UTP.prototype._closeMaybe = function () { | |
| if (this._closing && !this.connections.length && !this._sending.length && this._inited && !this._closed) { | ||
| this._closed = true | ||
| binding.utp_napi_close(this._handle) | ||
| } else { | ||
| for (const conn of this.connections) { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't understand how these lines could be missing: When you close the server, any open connections are supposed to be closed, right?
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, each connection must be closed individually
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Same as tcp)
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The instances created in the tests are not properly torn down then, causing a lot of weird errors happen when I tried to remove this. Checking the tests for "why they fail because this is removed" is the reason this takes to long for me to continue. |
||
| conn.destroy(new Error('server closed')) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| UTP.prototype.connect = function (port, ip) { | ||
| if (!this._inited) this.bind() | ||
| if (!ip) ip = '127.0.0.1' | ||
| const conn = new Connection(this, port, ip, null, this._allowHalfOpen) | ||
| if (!isIP(ip)) conn._resolveAndConnect(port, ip) | ||
| else conn._connect(port, ip || '127.0.0.1') | ||
| return conn | ||
| return new Connection(this, port, ip, null, this._allowHalfOpen) | ||
| } | ||
|
|
||
| UTP.prototype.listen = function (port, ip, onlistening) { | ||
|
|
@@ -197,19 +196,20 @@ UTP.prototype._realloc = function () { | |
| UTP.prototype._onmessage = function (size, port, address) { | ||
| if (size < 0) { | ||
| this.emit('error', new Error('Read failed (status: ' + size + ')')) | ||
| return EMPTY | ||
| return | ||
| } | ||
|
|
||
| const message = this._buffer.slice(this._offset, this._offset += size) | ||
| this.emit('message', message, { address, family: 'IPv4', port }) | ||
|
|
||
| if (this._buffer.length - this._offset <= 65536) { | ||
| // max package buffer is 64kb and we wanna make sure we have room for that | ||
| // returning the buffer indicates to the native code that | ||
| // the buffer has changed | ||
| this._buffer = Buffer.allocUnsafe(this._buffer.length) | ||
| this._offset = 0 | ||
| return this._buffer | ||
| } | ||
|
|
||
| return EMPTY | ||
| } | ||
|
|
||
| UTP.prototype._onsend = function (send, status) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.