/Users/alexjokela/projects/lattice/src/channel.c
Line | Count | Source |
1 | | #include "channel.h" |
2 | | #include "value.h" |
3 | | #include <stdlib.h> |
4 | | #include <string.h> |
5 | | |
6 | 42 | LatChannel *channel_new(void) { |
7 | 42 | LatChannel *ch = calloc(1, sizeof(LatChannel)); |
8 | 42 | ch->buffer = lat_vec_new(sizeof(LatValue)); |
9 | 42 | ch->closed = false; |
10 | 42 | ch->refcount = 1; |
11 | 42 | #ifndef __EMSCRIPTEN__ |
12 | 42 | pthread_mutex_init(&ch->mutex, NULL); |
13 | 42 | pthread_cond_init(&ch->cond_notempty, NULL); |
14 | 42 | #endif |
15 | 42 | return ch; |
16 | 42 | } |
17 | | |
18 | 179 | void channel_retain(LatChannel *ch) { |
19 | 179 | __atomic_add_fetch(&ch->refcount, 1, __ATOMIC_SEQ_CST); |
20 | 179 | } |
21 | | |
22 | 215 | void channel_release(LatChannel *ch) { |
23 | 215 | size_t prev = __atomic_sub_fetch(&ch->refcount, 1, __ATOMIC_SEQ_CST); |
24 | 215 | if (prev == 0) { |
25 | | /* Free any remaining buffered values */ |
26 | 36 | for (size_t i = 0; i < ch->buffer.len; i++) { |
27 | 0 | LatValue *v = lat_vec_get(&ch->buffer, i); |
28 | 0 | value_free(v); |
29 | 0 | } |
30 | 36 | lat_vec_free(&ch->buffer); |
31 | 36 | #ifndef __EMSCRIPTEN__ |
32 | 36 | pthread_mutex_destroy(&ch->mutex); |
33 | 36 | pthread_cond_destroy(&ch->cond_notempty); |
34 | 36 | #endif |
35 | 36 | free(ch); |
36 | 36 | } |
37 | 215 | } |
38 | | |
39 | 27 | bool channel_send(LatChannel *ch, LatValue val) { |
40 | 27 | #ifndef __EMSCRIPTEN__ |
41 | 27 | pthread_mutex_lock(&ch->mutex); |
42 | 27 | #endif |
43 | 27 | if (ch->closed) { |
44 | 0 | #ifndef __EMSCRIPTEN__ |
45 | 0 | pthread_mutex_unlock(&ch->mutex); |
46 | 0 | #endif |
47 | 0 | value_free(&val); |
48 | 0 | return false; |
49 | 0 | } |
50 | 27 | lat_vec_push(&ch->buffer, &val); |
51 | 27 | #ifndef __EMSCRIPTEN__ |
52 | 27 | pthread_cond_signal(&ch->cond_notempty); |
53 | | /* Wake any select waiters */ |
54 | 27 | for (LatSelectWaiter *w = ch->waiters; w; w = w->next) { |
55 | 0 | pthread_mutex_lock(w->mutex); |
56 | 0 | pthread_cond_signal(w->cond); |
57 | 0 | pthread_mutex_unlock(w->mutex); |
58 | 0 | } |
59 | 27 | pthread_mutex_unlock(&ch->mutex); |
60 | 27 | #endif |
61 | 27 | return true; |
62 | 27 | } |
63 | | |
64 | 24 | LatValue channel_recv(LatChannel *ch, bool *ok) { |
65 | 24 | #ifndef __EMSCRIPTEN__ |
66 | 24 | pthread_mutex_lock(&ch->mutex); |
67 | 24 | while (ch->buffer.len == 0 && !ch->closed) { |
68 | 0 | pthread_cond_wait(&ch->cond_notempty, &ch->mutex); |
69 | 0 | } |
70 | 24 | #endif |
71 | 24 | if (ch->buffer.len == 0) { |
72 | | /* closed + empty */ |
73 | 3 | #ifndef __EMSCRIPTEN__ |
74 | 3 | pthread_mutex_unlock(&ch->mutex); |
75 | 3 | #endif |
76 | 3 | *ok = false; |
77 | 3 | return value_unit(); |
78 | 3 | } |
79 | | /* Shift front element */ |
80 | 21 | LatValue val; |
81 | 21 | memcpy(&val, lat_vec_get(&ch->buffer, 0), sizeof(LatValue)); |
82 | | /* Shift remaining elements forward */ |
83 | 21 | if (ch->buffer.len > 1) { |
84 | 6 | memmove(ch->buffer.data, |
85 | 6 | (char *)ch->buffer.data + ch->buffer.elem_size, |
86 | 6 | (ch->buffer.len - 1) * ch->buffer.elem_size); |
87 | 6 | } |
88 | 21 | ch->buffer.len--; |
89 | 21 | #ifndef __EMSCRIPTEN__ |
90 | 21 | pthread_mutex_unlock(&ch->mutex); |
91 | 21 | #endif |
92 | 21 | *ok = true; |
93 | 21 | return val; |
94 | 24 | } |
95 | | |
96 | 9 | void channel_close(LatChannel *ch) { |
97 | 9 | #ifndef __EMSCRIPTEN__ |
98 | 9 | pthread_mutex_lock(&ch->mutex); |
99 | 9 | #endif |
100 | 9 | ch->closed = true; |
101 | 9 | #ifndef __EMSCRIPTEN__ |
102 | 9 | pthread_cond_broadcast(&ch->cond_notempty); |
103 | | /* Wake any select waiters */ |
104 | 9 | for (LatSelectWaiter *w = ch->waiters; w; w = w->next) { |
105 | 0 | pthread_mutex_lock(w->mutex); |
106 | 0 | pthread_cond_signal(w->cond); |
107 | 0 | pthread_mutex_unlock(w->mutex); |
108 | 0 | } |
109 | 9 | pthread_mutex_unlock(&ch->mutex); |
110 | 9 | #endif |
111 | 9 | } |
112 | | |
113 | 18 | bool channel_try_recv(LatChannel *ch, LatValue *out, bool *closed_out) { |
114 | 18 | #ifndef __EMSCRIPTEN__ |
115 | 18 | pthread_mutex_lock(&ch->mutex); |
116 | 18 | #endif |
117 | 18 | if (ch->buffer.len == 0) { |
118 | 12 | if (closed_out) *closed_out = ch->closed; |
119 | 12 | #ifndef __EMSCRIPTEN__ |
120 | 12 | pthread_mutex_unlock(&ch->mutex); |
121 | 12 | #endif |
122 | 12 | return false; |
123 | 12 | } |
124 | 18 | memcpy(out, lat_vec_get(&ch->buffer, 0), sizeof(LatValue)); |
125 | 6 | if (ch->buffer.len > 1) { |
126 | 0 | memmove(ch->buffer.data, |
127 | 0 | (char *)ch->buffer.data + ch->buffer.elem_size, |
128 | 0 | (ch->buffer.len - 1) * ch->buffer.elem_size); |
129 | 0 | } |
130 | 6 | ch->buffer.len--; |
131 | 6 | if (closed_out) *closed_out = false; |
132 | 6 | #ifndef __EMSCRIPTEN__ |
133 | 6 | pthread_mutex_unlock(&ch->mutex); |
134 | 6 | #endif |
135 | 6 | return true; |
136 | 18 | } |
137 | | |
138 | 0 | void channel_add_waiter(LatChannel *ch, LatSelectWaiter *w) { |
139 | 0 | #ifndef __EMSCRIPTEN__ |
140 | 0 | pthread_mutex_lock(&ch->mutex); |
141 | 0 | w->next = ch->waiters; |
142 | 0 | ch->waiters = w; |
143 | 0 | pthread_mutex_unlock(&ch->mutex); |
144 | | #else |
145 | | (void)ch; (void)w; |
146 | | #endif |
147 | 0 | } |
148 | | |
149 | 0 | void channel_remove_waiter(LatChannel *ch, LatSelectWaiter *w) { |
150 | 0 | #ifndef __EMSCRIPTEN__ |
151 | 0 | pthread_mutex_lock(&ch->mutex); |
152 | 0 | LatSelectWaiter **pp = &ch->waiters; |
153 | 0 | while (*pp) { |
154 | 0 | if (*pp == w) { *pp = w->next; break; } |
155 | 0 | pp = &(*pp)->next; |
156 | 0 | } |
157 | 0 | pthread_mutex_unlock(&ch->mutex); |
158 | | #else |
159 | | (void)ch; (void)w; |
160 | | #endif |
161 | 0 | } |