Browse code

webkernel: service (withoy read processing and without write-from-fd functionality)

Dario Rodriguez authored on 17/06/2014 11:12:59
Showing 3 changed files
... ...
@@ -7,15 +7,13 @@ all: webkernel_test
7 7
 clean:
8 8
 	rm -f *.o webkernel_test
9 9
 
10
-webkernel_test: webkernel_test.o iobuf.o loglib.o parselib.o sbuf.o \
10
+webkernel_test: webkernel_test.o loglib.o parselib.o sbuf.o \
11 11
 		socklib.o webkernel.o
12
-	$(CC) $(LDFLAGS) webkernel_test.o iobuf.o loglib.o parselib.o sbuf.o \
12
+	$(CC) $(LDFLAGS) webkernel_test.o loglib.o parselib.o sbuf.o \
13 13
 		socklib.o webkernel.o -o webkernel_test
14 14
 
15 15
 webkernel_test.o:
16 16
 
17
-iobuf.o:
18
-
19 17
 loglib.o:
20 18
 
21 19
 parselib.o:
... ...
@@ -33,6 +33,10 @@ typedef struct wk_client {
33 33
         int sizeoutbufids;
34 34
         int usedoutbufids;
35 35
         int outbufids[MAXOUTBUF];
36
+        int serviced;
37
+        wk_uri uri;
38
+        int continuationactive;
39
+        int keepalive;
36 40
 } wk_client;
37 41
 
38 42
 typedef struct wk_clientblock {
... ...
@@ -64,15 +68,20 @@ typedef struct _wk {
64 68
         int sizebufblocks;
65 69
         int usedbufblocks;
66 70
         wk_bufblock *bufblocks;
71
+        void (*callback_event)(/*wk *web,int connid, wk_event event, void *userptr*/);
72
+        wk_action (*callback_http)(/*wk *web,int connid, wk_uri *uri, void *userptr*/);
73
+        wk_action (*callback_continuation)(/*wk *web,int connid, wk_uri *uri, void *userptr*/);
74
+        void *userptr;
67 75
 } _wk;
68 76
 
69
-static int wk_accept(_wk *web);
77
+static wk_client *wk_accept(_wk *web);
70 78
 static wk_client *wk_clientacquire(_wk *web);
71 79
 static int wk_clientrelease(_wk *web, int connid);
72 80
 static wk_client *wk_clientget(_wk *web, int connid);
81
+static void wk_clientserviceread(_wk *web, wk_client *client);
73 82
 
74 83
 wk *
75
-wk_init(int port, sselect *ssel, void (*callback_event)(/*wk *paramweb,int connid, wk_event event, void *userptr*/), void (*callback_http)(/*wk *paramweb,int connid, wk_uri *uri, void *userptr*/), void (*callback_continuation)(/*wk *paramweb,int connid, wk_uri *uri, void *userptr*/), void *userptr)
84
+wk_init(int port, sselect *ssel, void (*callback_event)(/*wk *web,int connid, wk_event event, void *userptr*/), wk_action (*callback_http)(/*wk *web,int connid, wk_uri *uri, void *userptr*/), wk_action (*callback_continuation)(/*wk *web,int connid, wk_uri *uri, void *userptr*/), void *userptr)
76 85
 {
77 86
         _wk *web;
78 87
         if(ssel==NULL || callback_http==NULL)
... ...
@@ -81,6 +90,10 @@ wk_init(int port, sselect *ssel, void (*callback_event)(/*wk *paramweb,int conni
81 90
                 return(NULL);
82 91
         memset(web,0,sizeof(_wk));
83 92
         web->serverfd=-1;
93
+        web->callback_event=callback_event;
94
+        web->callback_http=callback_http;
95
+        web->callback_continuation=callback_continuation;
96
+        web->userptr=userptr;
84 97
         web->ssel=ssel;
85 98
         FD_ZERO(&(web->fdset));
86 99
         if((web->serverfd=ipv4_server(port))==-1) {
... ...
@@ -90,6 +103,8 @@ wk_init(int port, sselect *ssel, void (*callback_event)(/*wk *paramweb,int conni
90 103
         sock_setunsafe(web->serverfd);
91 104
         FD_SET(web->serverfd,&(web->fdset));
92 105
         sselect_addread(ssel,web->serverfd,NULL);
106
+        if(web->callback_event!=NULL)
107
+                web->callback_event((wk *)web,-1,wke_init,web->userptr);
93 108
         return((wk *)web);
94 109
 }
95 110
 
... ...
@@ -103,6 +118,8 @@ wk_free(wk *paramweb)
103 118
         wk_bufblock *bb;
104 119
         if(web==NULL)
105 120
                 return;
121
+        if(web->callback_event!=NULL)
122
+                web->callback_event((wk *)web,-1,wke_fini,web->userptr);
106 123
         /* release server fds */
107 124
         if(web->serverfd!=-1) {
108 125
                 sselect_delread(web->ssel,web->serverfd);
... ...
@@ -137,7 +154,7 @@ wk_free(wk *paramweb)
137 154
         web->sizeclientblocks=0;
138 155
         if(web->clientblocks!=NULL)
139 156
                 free(web->clientblocks),web->clientblocks=NULL;
140
-	/* release the used sbuf */
157
+        /* release the used sbuf */
141 158
         for(i=0;i<web->usedbufblocks;i++) {
142 159
                 bb=web->bufblocks+i;
143 160
                 if(bb->bufs==NULL)
... ...
@@ -157,7 +174,15 @@ wk_free(wk *paramweb)
157 174
 wk_uri *
158 175
 wk_geturi(wk *paramweb, int connid)
159 176
 {
160
-
177
+        _wk *web=(_wk *)paramweb;
178
+        wk_client *client;
179
+        if(web==NULL)
180
+                return(NULL);
181
+        if((client=wk_clientget(web,connid))==NULL)
182
+                return(NULL);
183
+        if(client->serviced==0)
184
+                return(NULL);
185
+        return(&(client->uri));
161 186
 }
162 187
 
163 188
 int
... ...
@@ -165,8 +190,9 @@ wk_service(wk *paramweb)
165 190
 {
166 191
         int fds[FDBLOCK];
167 192
         int fd;
168
-        int n;
169
-        int i;
193
+        int n,i,k;
194
+        sbuf *in;
195
+        sbuf *out;
170 196
         _wk *web=(_wk *)paramweb;
171 197
         wk_client *client;
172 198
         if(web==NULL)
... ...
@@ -176,14 +202,66 @@ wk_service(wk *paramweb)
176 202
                         fd=fds[i];
177 203
                         if(fd==web->serverfd) {
178 204
                                 /* accept new connection */
179
-                                wk_accept(web);
205
+                                if((client=wk_accept(web))!=NULL && web->callback_event!=NULL)
206
+                                        web->callback_event((wk *)web,client->connid,wke_connected,web->userptr);
207
+                                continue; /* all done here */
208
+                        }
209
+                        if((client=(wk_client *)sselect_getuserptr(web->ssel,fd))==NULL || client->web!=(wk *)web) {
210
+                                wk_close((wk *)web,client->connid);
211
+                                continue; /* internal error */
212
+                        }
213
+                        if((in=wk_sbufget((wk *) web,client->inbufid))==NULL) {
214
+                                wk_close((wk *)web,client->connid);
215
+                                continue; /* internal error */
216
+                        }
217
+                        if(sbuf_unused(in)<=0) {
218
+                                /* no room for the new data */
219
+                                sselect_delread(web->ssel,fd);
220
+                                continue;
221
+                        }
222
+                        if(sbuf_fill(in,fd,sock_queued(fd))==0) {
223
+                                /* client has closed connection */
224
+                                wk_close((wk *) web, fd);
180 225
                                 continue;
181 226
                         }
227
+                        wk_clientserviceread(web,client);
228
+                }
229
+        }
230
+        while((n=sselect_getwritefiltered(web,&(web->fdset),fds,sizeof(fds)/sizeof(fds[0])))>0) {
231
+                for(i=0;i<n;i++) {
232
+                        fd=fds[i];
182 233
                         if((client=(wk_client *)sselect_getuserptr(web->ssel,fd))==NULL || client->web!=(wk *)web)
183 234
                                 continue; /* internal error */
184
-
235
+                        for(out=NULL,k=client->usedoutbufids-1;k>=0;k--) {
236
+                                if((out=wk_sbufget((wk *) web,client->outbufids[k]))==NULL) {
237
+                                        k=-1;
238
+                                        break; /* internal error */
239
+                                }
240
+                        }
241
+                        if(k==-1 || out==NULL) {
242
+                                wk_close((wk *)web,client->connid);
243
+                                continue; /* internal error */
244
+                        }
245
+                        sbuf_send(out,fd,sbuf_count(out));
246
+                        if(sbuf_count(out)==0) {
247
+                                if(k>0) {
248
+                                        client->usedoutbufids--;
249
+                                        wk_sbufrelease((wk *)web,client->outbufids[k]),client->outbufids[k]=-1;
250
+                                } else {
251
+                                        sselect_delwrite(web->ssel,client->fd);
252
+                                        if(client->continuationactive && web->callback_continuation!=NULL) {
253
+                                                client->continuationactive=web->callback_continuation((wk *)web,client->connid,&(client->uri),web->userptr);
254
+                                        } else {
255
+                                                client->serviced=0; /* we have finished servicing this one */
256
+                                                if(!client->keepalive)
257
+                                                        wk_close((wk *)web, fd); /* all sent */
258
+
259
+                                        }
260
+                                }
261
+                        }
185 262
                 }
186 263
         }
264
+        return(0);
187 265
 }
188 266
 
189 267
 int
... ...
@@ -219,7 +297,22 @@ wk_write(wk *paramweb, int connid, void *data, int datalen)
219 297
 int
220 298
 wk_close(wk *paramweb, int connid)
221 299
 {
222
-
300
+        _wk *web=(_wk *)paramweb;
301
+        wk_client *client;
302
+        if(web==NULL)
303
+                return(-1);
304
+        if((client=wk_clientget(web,connid))==NULL)
305
+                return(-1);
306
+        if(client->fd!=-1) {
307
+                sselect_delread(web->ssel,client->fd);
308
+                sselect_delwrite(web->ssel,client->fd);
309
+                FD_CLR(client->fd,&(web->fdset));
310
+                close(client->fd),client->fd=-1;
311
+        }
312
+        wk_clientrelease(web,connid);
313
+        if(web->callback_event!=NULL)
314
+                web->callback_event((wk *)web,connid,wke_closed,web->userptr);
315
+        return(0);
223 316
 }
224 317
 
225 318
 char *
... ...
@@ -384,21 +477,21 @@ wk_bufget(wk *paramweb, int sbufid)
384 477
 
385 478
 
386 479
 /* local functions */
387
-static int
480
+static wk_client *
388 481
 wk_accept(_wk *web)
389 482
 {
390 483
         int newfd;
391 484
         wk_client *client;
392 485
         if((newfd=sock_accept(web->serverfd))==-1)
393
-                return(-1);
486
+                return(NULL);
394 487
         if((client=wk_clientacquire(web))==NULL) {
395 488
                 close(newfd),newfd=-1;
396
-                return(-1);
489
+                return(NULL);
397 490
         }
398 491
         client->fd=newfd;
399 492
         FD_SET(client->fd,&(web->fdset));
400 493
         sselect_addread(web->ssel,client->fd,(void *)client);
401
-        return(0);
494
+        return(client);
402 495
 }
403 496
 
404 497
 static wk_client *
... ...
@@ -483,7 +576,7 @@ wk_clientrelease(_wk *web, int connid)
483 576
         web->clientblocks[numblock].acquired[j>>3]&=(~bit);
484 577
         if(client->inbufid!=-1)
485 578
                 wk_sbufrelease((wk *)web,client->inbufid),client->inbufid=-1;
486
-        for(w=0;w<client->usedoutbufids;w++) 
579
+        for(w=0;w<client->usedoutbufids;w++)
487 580
                 wk_sbufrelease((wk *)web,client->outbufids[w]),client->outbufids[w]=-1;
488 581
         memset(client,0,sizeof(wk_client));
489 582
         return(0);
... ...
@@ -505,4 +598,9 @@ wk_clientget(_wk *web, int connid)
505 598
         return(web->clientblocks[numblock].clients+j);
506 599
 }
507 600
 
601
+static void
602
+wk_clientserviceread(_wk *web, wk_client *client)
603
+{
604
+#warning TODO
605
+}
508 606
 
... ...
@@ -38,7 +38,12 @@ typedef enum wk_error {
38 38
         wkerr_notimplemented=501,
39 39
 } wk_error;
40 40
 
41
-wk *wk_init(int port, sselect *ssel, void (*callback_event)(/*wk *web,int connid, wk_event event, void *userptr*/), void (*callback_http)(/*wk *web,int connid, wk_uri *uri, void *userptr*/), void (*callback_continuation)(/*wk *web,int connid, wk_uri *uri, void *userptr*/), void *userptr);
41
+typedef enum wk_action {
42
+        wkact_finished=0,
43
+        wkact_continuation,
44
+} wk_action;
45
+
46
+wk *wk_init(int port, sselect *ssel, void (*callback_event)(/*wk *web,int connid, wk_event event, void *userptr*/), wk_action (*callback_http)(/*wk *web,int connid, wk_uri *uri, void *userptr*/), wk_action (*callback_continuation)(/*wk *web,int connid, wk_uri *uri, void *userptr*/), void *userptr);
42 47
 void wk_free(wk *web);
43 48
 
44 49
 wk_uri *wk_geturi(wk *web, int connid); /* for use in callback_event() when wke_closed */