1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
|
/*
* Authored by Alex Hultman, 2018-2019.
* Intellectual property of third-party.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "libusockets.h"
#include "internal/internal.h"
#include <stdlib.h>
#if defined(LIBUS_USE_EPOLL) || defined(LIBUS_USE_KQUEUE)
void Bun__internal_dispatch_ready_poll(void* loop, void* poll);
// void Bun__internal_dispatch_ready_poll(void* loop, void* poll) {}
#ifndef WIN32
/* Cannot include this one on Windows */
#include <unistd.h>
#include <stdint.h>
#endif
void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs, void*);
/* Pointer tags are used to indicate a Bun pointer versus a uSockets pointer */
#define UNSET_BITS_49_UNTIL_64 0x0000FFFFFFFFFFFF
#define CLEAR_POINTER_TAG(p) ((void *) ((uintptr_t) (p) & UNSET_BITS_49_UNTIL_64))
#define LIKELY(cond) __builtin_expect((uint64_t)(void*)(cond), 1)
#define UNLIKELY(cond) __builtin_expect((uint64_t)(void*)(cond), 0)
#ifdef LIBUS_USE_EPOLL
#define GET_READY_POLL(loop, index) (struct us_poll_t *) loop->ready_polls[index].data.ptr
#define SET_READY_POLL(loop, index, poll) loop->ready_polls[index].data.ptr = (void*)poll
#else
#define GET_READY_POLL(loop, index) (struct us_poll_t *) loop->ready_polls[index].udata
#define SET_READY_POLL(loop, index, poll) loop->ready_polls[index].udata = (uint64_t)poll
#endif
/* Loop */
void us_loop_free(struct us_loop_t *loop) {
us_internal_loop_data_free(loop);
close(loop->fd);
us_free(loop);
}
/* Poll */
struct us_poll_t *us_create_poll(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
if (!fallthrough) {
loop->num_polls++;
}
return CLEAR_POINTER_TAG(us_malloc(sizeof(struct us_poll_t) + ext_size));
}
/* Todo: this one should be us_internal_poll_free */
void us_poll_free(struct us_poll_t *p, struct us_loop_t *loop) {
loop->num_polls--;
us_free(p);
}
void *us_poll_ext(struct us_poll_t *p) {
return p + 1;
}
/* Todo: why have us_poll_create AND us_poll_init!? libuv legacy! */
void us_poll_init(struct us_poll_t *p, LIBUS_SOCKET_DESCRIPTOR fd, int poll_type) {
p->state.fd = fd;
p->state.poll_type = poll_type;
}
int us_poll_events(struct us_poll_t *p) {
return ((p->state.poll_type & POLL_TYPE_POLLING_IN) ? LIBUS_SOCKET_READABLE : 0) | ((p->state.poll_type & POLL_TYPE_POLLING_OUT) ? LIBUS_SOCKET_WRITABLE : 0);
}
LIBUS_SOCKET_DESCRIPTOR us_poll_fd(struct us_poll_t *p) {
return p->state.fd;
}
/* Returns any of listen socket, socket, shut down socket or callback */
int us_internal_poll_type(struct us_poll_t *p) {
return p->state.poll_type & 3;
}
/* Bug: doesn't really SET, rather read and change, so needs to be inited first! */
void us_internal_poll_set_type(struct us_poll_t *p, int poll_type) {
p->state.poll_type = poll_type | (p->state.poll_type & 12);
}
/* Timer */
void *us_timer_ext(struct us_timer_t *timer) {
return ((struct us_internal_callback_t *) timer) + 1;
}
struct us_loop_t *us_timer_loop(struct us_timer_t *t) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
return internal_cb->loop;
}
/* Loop */
struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop), unsigned int ext_size) {
struct us_loop_t *loop = (struct us_loop_t *) us_malloc(sizeof(struct us_loop_t) + ext_size);
loop->num_polls = 0;
/* These could be accessed if we close a poll before starting the loop */
loop->num_ready_polls = 0;
loop->current_ready_poll = 0;
loop->bun_polls = 0;
#ifdef LIBUS_USE_EPOLL
loop->fd = epoll_create1(EPOLL_CLOEXEC);
#else
loop->fd = kqueue();
#endif
us_internal_loop_data_init(loop, wakeup_cb, pre_cb, post_cb);
return loop;
}
void us_loop_run(struct us_loop_t *loop) {
us_loop_integrate(loop);
/* While we have non-fallthrough polls we shouldn't fall through */
while (loop->num_polls) {
/* Emit pre callback */
us_internal_loop_pre(loop);
/* Fetch ready polls */
#ifdef LIBUS_USE_EPOLL
loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1);
#else
loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL);
#endif
/* Iterate ready polls, dispatching them by type */
for (loop->current_ready_poll = 0; loop->current_ready_poll < loop->num_ready_polls; loop->current_ready_poll++) {
struct us_poll_t *poll = GET_READY_POLL(loop, loop->current_ready_poll);
/* Any ready poll marked with nullptr will be ignored */
if (LIKELY(poll)) {
if (CLEAR_POINTER_TAG(poll) != poll) {
Bun__internal_dispatch_ready_poll(loop, poll);
continue;
}
#ifdef LIBUS_USE_EPOLL
int events = loop->ready_polls[loop->current_ready_poll].events;
int error = loop->ready_polls[loop->current_ready_poll].events & (EPOLLERR | EPOLLHUP);
#else
/* EVFILT_READ, EVFILT_TIME, EVFILT_USER are all mapped to LIBUS_SOCKET_READABLE */
int events = LIBUS_SOCKET_READABLE;
if (loop->ready_polls[loop->current_ready_poll].filter == EVFILT_WRITE) {
events = LIBUS_SOCKET_WRITABLE;
}
int error = loop->ready_polls[loop->current_ready_poll].flags & (EV_ERROR | EV_EOF);
#endif
/* Always filter all polls by what they actually poll for (callback polls always poll for readable) */
events &= us_poll_events(poll);
if (events || error) {
us_internal_dispatch_ready_poll(poll, error, events);
}
}
}
/* Emit post callback */
us_internal_loop_post(loop);
}
}
void bun_on_tick_before(void* ctx);
void bun_on_tick_after(void* ctx);
void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs, void* tickCallbackContext) {
us_loop_integrate(loop);
if (loop->num_polls == 0)
return;
if (tickCallbackContext) {
bun_on_tick_before(tickCallbackContext);
}
/* Emit pre callback */
us_internal_loop_pre(loop);
/* Fetch ready polls */
#ifdef LIBUS_USE_EPOLL
if (timeoutMs > 0) {
loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, (int)timeoutMs);
} else {
loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1);
}
#else
if (timeoutMs > 0) {
struct timespec ts = {0, 0};
ts.tv_sec = timeoutMs / 1000;
ts.tv_nsec = (timeoutMs % 1000) * 1000000;
loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, &ts);
} else {
loop->num_ready_polls = kevent64(loop->fd, NULL, 0, loop->ready_polls, 1024, 0, NULL);
}
#endif
if (tickCallbackContext) {
bun_on_tick_after(tickCallbackContext);
}
/* Iterate ready polls, dispatching them by type */
for (loop->current_ready_poll = 0; loop->current_ready_poll < loop->num_ready_polls; loop->current_ready_poll++) {
struct us_poll_t *poll = GET_READY_POLL(loop, loop->current_ready_poll);
/* Any ready poll marked with nullptr will be ignored */
if (LIKELY(poll)) {
if (CLEAR_POINTER_TAG(poll) != poll) {
Bun__internal_dispatch_ready_poll(loop, poll);
continue;
}
#ifdef LIBUS_USE_EPOLL
int events = loop->ready_polls[loop->current_ready_poll].events;
int error = loop->ready_polls[loop->current_ready_poll].events & (EPOLLERR | EPOLLHUP);
#else
/* EVFILT_READ, EVFILT_TIME, EVFILT_USER are all mapped to LIBUS_SOCKET_READABLE */
int events = LIBUS_SOCKET_READABLE;
if (loop->ready_polls[loop->current_ready_poll].filter == EVFILT_WRITE) {
events = LIBUS_SOCKET_WRITABLE;
}
int error = loop->ready_polls[loop->current_ready_poll].flags & (EV_ERROR | EV_EOF);
#endif
/* Always filter all polls by what they actually poll for (callback polls always poll for readable) */
events &= us_poll_events(poll);
if (events || error) {
us_internal_dispatch_ready_poll(poll, error, events);
}
}
}
/* Emit post callback */
us_internal_loop_post(loop);
}
void us_internal_loop_update_pending_ready_polls(struct us_loop_t *loop, struct us_poll_t *old_poll, struct us_poll_t *new_poll, int old_events, int new_events) {
#ifdef LIBUS_USE_EPOLL
/* Epoll only has one ready poll per poll */
int num_entries_possibly_remaining = 1;
#else
/* Ready polls may contain same poll twice under kqueue, as one poll may hold two filters */
int num_entries_possibly_remaining = 2;//((old_events & LIBUS_SOCKET_READABLE) ? 1 : 0) + ((old_events & LIBUS_SOCKET_WRITABLE) ? 1 : 0);
#endif
/* Todo: for kqueue if we track things in us_change_poll it is possible to have a fast path with no seeking in cases of:
* current poll being us AND we only poll for one thing */
for (int i = loop->current_ready_poll; i < loop->num_ready_polls && num_entries_possibly_remaining; i++) {
if (GET_READY_POLL(loop, i) == old_poll) {
// if new events does not contain the ready events of this poll then remove (no we filter that out later on)
SET_READY_POLL(loop, i, new_poll);
num_entries_possibly_remaining--;
}
}
}
/* Poll */
#ifdef LIBUS_USE_KQUEUE
/* Helper function for setting or updating EVFILT_READ and EVFILT_WRITE */
int kqueue_change(int kqfd, int fd, int old_events, int new_events, void *user_data) {
struct kevent64_s change_list[2];
int change_length = 0;
/* Do they differ in readable? */
if ((new_events & LIBUS_SOCKET_READABLE) != (old_events & LIBUS_SOCKET_READABLE)) {
EV_SET64(&change_list[change_length++], fd, EVFILT_READ, (new_events & LIBUS_SOCKET_READABLE) ? EV_ADD : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0);
}
/* Do they differ in writable? */
if ((new_events & LIBUS_SOCKET_WRITABLE) != (old_events & LIBUS_SOCKET_WRITABLE)) {
EV_SET64(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD : EV_DELETE, 0, 0, (uint64_t)(void*)user_data, 0, 0);
}
int ret = kevent64(kqfd, change_list, change_length, NULL, 0, 0, NULL);
// ret should be 0 in most cases (not guaranteed when removing async)
return ret;
}
#endif
struct us_poll_t *us_poll_resize(struct us_poll_t *p, struct us_loop_t *loop, unsigned int ext_size) {
int events = us_poll_events(p);
struct us_poll_t *new_p = us_realloc(p, sizeof(struct us_poll_t) + ext_size);
if (p != new_p && events) {
#ifdef LIBUS_USE_EPOLL
/* Hack: forcefully update poll by stripping away already set events */
new_p->state.poll_type = us_internal_poll_type(new_p);
us_poll_change(new_p, loop, events);
#else
/* Forcefully update poll by resetting them with new_p as user data */
kqueue_change(loop->fd, new_p->state.fd, 0, events, new_p);
#endif
/* This is needed for epoll also (us_change_poll doesn't update the old poll) */
us_internal_loop_update_pending_ready_polls(loop, p, new_p, events, events);
}
return new_p;
}
void us_poll_start(struct us_poll_t *p, struct us_loop_t *loop, int events) {
p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) | ((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0);
#ifdef LIBUS_USE_EPOLL
struct epoll_event event;
event.events = events;
event.data.ptr = p;
epoll_ctl(loop->fd, EPOLL_CTL_ADD, p->state.fd, &event);
#else
kqueue_change(loop->fd, p->state.fd, 0, events, p);
#endif
}
void us_poll_change(struct us_poll_t *p, struct us_loop_t *loop, int events) {
int old_events = us_poll_events(p);
if (old_events != events) {
p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) | ((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0);
#ifdef LIBUS_USE_EPOLL
struct epoll_event event;
event.events = events;
event.data.ptr = p;
epoll_ctl(loop->fd, EPOLL_CTL_MOD, p->state.fd, &event);
#else
kqueue_change(loop->fd, p->state.fd, old_events, events, p);
#endif
/* Set all removed events to null-polls in pending ready poll list */
//us_internal_loop_update_pending_ready_polls(loop, p, p, old_events, events);
}
}
void us_poll_stop(struct us_poll_t *p, struct us_loop_t *loop) {
int old_events = us_poll_events(p);
int new_events = 0;
#ifdef LIBUS_USE_EPOLL
struct epoll_event event;
epoll_ctl(loop->fd, EPOLL_CTL_DEL, p->state.fd, &event);
#else
if (old_events) {
kqueue_change(loop->fd, p->state.fd, old_events, new_events, NULL);
}
#endif
/* Disable any instance of us in the pending ready poll list */
us_internal_loop_update_pending_ready_polls(loop, p, 0, old_events, new_events);
}
unsigned int us_internal_accept_poll_event(struct us_poll_t *p) {
#ifdef LIBUS_USE_EPOLL
int fd = us_poll_fd(p);
uint64_t buf;
int read_length = read(fd, &buf, 8);
(void)read_length;
return buf;
#else
/* Kqueue has no underlying FD for timers or user events */
return 0;
#endif
}
/* Timer */
#ifdef LIBUS_USE_EPOLL
struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) + ext_size);
int timerfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd == -1) {
return NULL;
}
us_poll_init(p, timerfd, POLL_TYPE_CALLBACK);
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p;
cb->loop = loop;
cb->cb_expects_the_loop = 0;
cb->leave_poll_ready = 0;
return (struct us_timer_t *) cb;
}
#else
struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
struct us_internal_callback_t *cb = us_malloc(sizeof(struct us_internal_callback_t) + ext_size);
cb->loop = loop;
cb->cb_expects_the_loop = 0;
cb->leave_poll_ready = 0;
/* Bug: us_internal_poll_set_type does not SET the type, it only CHANGES it */
cb->p.state.poll_type = POLL_TYPE_POLLING_IN;
us_internal_poll_set_type((struct us_poll_t *) cb, POLL_TYPE_CALLBACK);
if (!fallthrough) {
loop->num_polls++;
}
return (struct us_timer_t *) cb;
}
#endif
#ifdef LIBUS_USE_EPOLL
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) timer;
us_poll_stop(&cb->p, cb->loop);
close(us_poll_fd(&cb->p));
/* (regular) sockets are the only polls which are not freed immediately */
if(fallthrough){
us_free(timer);
}else {
us_poll_free((struct us_poll_t *) timer, cb->loop);
}
}
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
struct itimerspec timer_spec = {
{repeat_ms / 1000, (long) (repeat_ms % 1000) * (long) 1000000},
{ms / 1000, (long) (ms % 1000) * (long) 1000000}
};
timerfd_settime(us_poll_fd((struct us_poll_t *) t), 0, &timer_spec, NULL);
us_poll_start((struct us_poll_t *) t, internal_cb->loop, LIBUS_SOCKET_READABLE);
}
#else
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) timer;
struct kevent64_s event;
EV_SET64(&event, (uint64_t) (void*) internal_cb, EVFILT_TIMER, EV_DELETE, 0, 0, (uint64_t)internal_cb, 0, 0);
kevent64(internal_cb->loop->fd, &event, 1, NULL, 0, 0, NULL);
/* (regular) sockets are the only polls which are not freed immediately */
if(fallthrough){
us_free(timer);
}else {
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
}
}
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
/* Bug: repeat_ms must be the same as ms, or 0 */
struct kevent64_s event;
uint64_t ptr = (uint64_t)(void*)internal_cb;
EV_SET64(&event, ptr, EVFILT_TIMER, EV_ADD | (repeat_ms ? 0 : EV_ONESHOT), 0, ms, (uint64_t)internal_cb, 0, 0);
kevent64(internal_cb->loop->fd, &event, 1, NULL, 0, 0, NULL);
}
#endif
/* Async (internal helper for loop's wakeup feature) */
#ifdef LIBUS_USE_EPOLL
struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) + ext_size);
us_poll_init(p, eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC), POLL_TYPE_CALLBACK);
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p;
cb->loop = loop;
cb->cb_expects_the_loop = 1;
cb->leave_poll_ready = 0;
return (struct us_internal_async *) cb;
}
// identical code as for timer, make it shared for "callback types"
void us_internal_async_close(struct us_internal_async *a) {
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) a;
us_poll_stop(&cb->p, cb->loop);
close(us_poll_fd(&cb->p));
/* (regular) sockets are the only polls which are not freed immediately */
us_poll_free((struct us_poll_t *) a, cb->loop);
}
void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_internal_async *)) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
us_poll_start((struct us_poll_t *) a, internal_cb->loop, LIBUS_SOCKET_READABLE);
}
void us_internal_async_wakeup(struct us_internal_async *a) {
uint64_t one = 1;
int written = write(us_poll_fd((struct us_poll_t *) a), &one, 8);
(void)written;
}
#else
#define MACHPORT_BUF_LEN 1024
struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
struct us_internal_callback_t *cb = us_malloc(sizeof(struct us_internal_callback_t) + ext_size);
cb->loop = loop;
cb->cb_expects_the_loop = 1;
cb->leave_poll_ready = 0;
/* Bug: us_internal_poll_set_type does not SET the type, it only CHANGES it */
cb->p.state.poll_type = POLL_TYPE_POLLING_IN;
us_internal_poll_set_type((struct us_poll_t *) cb, POLL_TYPE_CALLBACK);
if (!fallthrough) {
loop->num_polls++;
}
cb->machport_buf = us_malloc(MACHPORT_BUF_LEN);
kern_return_t kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &cb->port);
if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}
return (struct us_internal_async *) cb;
}
// identical code as for timer, make it shared for "callback types"
void us_internal_async_close(struct us_internal_async *a) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
struct kevent64_s event;
uint64_t ptr = (uint64_t)(void*)internal_cb;
EV_SET64(&event, ptr, EVFILT_MACHPORT, EV_DELETE, 0, 0, (uint64_t)(void*)internal_cb, 0,0);
kevent64(internal_cb->loop->fd, &event, 1, NULL, 0, 0, NULL);
mach_port_deallocate(mach_task_self(), internal_cb->port);
us_free(internal_cb->machport_buf);
/* (regular) sockets are the only polls which are not freed immediately */
us_poll_free((struct us_poll_t *) a, internal_cb->loop);
}
void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_internal_async *)) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
// EVFILT_MACHPORT benchmarks faster than EVFILT_USER when using multiple threads
// Very old versions of macOS required them to be portsets instead of ports
// but that is no longer the case
// There are not many examples on the internet of using machports this way
// you can find one in Chromium's codebase.
struct kevent64_s event;
event.ident = internal_cb->port;
event.filter = EVFILT_MACHPORT;
event.flags = EV_ADD | EV_ENABLE;
event.fflags = MACH_RCV_MSG | MACH_RCV_OVERWRITE;
event.ext[0] = (uint64_t)(void*)internal_cb->machport_buf;
event.ext[1] = MACHPORT_BUF_LEN;
event.udata = (uint64_t)(void*)internal_cb;
int ret = kevent64(internal_cb->loop->fd, &event, 1, NULL, 0, 0, NULL);
if (UNLIKELY(ret == -1)) {
abort();
}
}
void us_internal_async_wakeup(struct us_internal_async *a) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
mach_msg_empty_send_t message;
memset(&message, 0, sizeof(message));
message.header.msgh_size = sizeof(message);
message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
message.header.msgh_remote_port = internal_cb->port;
kern_return_t kr = mach_msg_send(&message.header);
if (kr != KERN_SUCCESS) {
// If us_internal_async_wakeup is being called by other threads faster
// than the pump can dispatch work, the kernel message queue for the wakeup
// port can fill The kernel does return a SEND_ONCE right in the case of
// failure, which must be destroyed to avoid leaking.
mach_msg_destroy(&message.header);
}
}
#endif
#endif
|