From d8971f9fe15a8061c7c5685fd80657fc4d8b5a88 Mon Sep 17 00:00:00 2001 From: Marek Vavrusa <marek@vavrusa.com> Date: Tue, 12 Oct 2010 16:35:35 +0200 Subject: [PATCH] Reimplemented UDP handler. - No epoll and master thread - Workers are racing on recvfrom() - Workers are natively blocked by recvfrom() - Zero synchronisation overhead - Presuming recfrom() is thread-safe --- src/server/udp-handler.c | 98 ++++++---------------------------------- 1 file changed, 14 insertions(+), 84 deletions(-) diff --git a/src/server/udp-handler.c b/src/server/udp-handler.c index c354e4c19..71bdb593c 100644 --- a/src/server/udp-handler.c +++ b/src/server/udp-handler.c @@ -9,7 +9,6 @@ typedef struct sm_event { struct sm_manager* manager; int fd; - uint32_t events; void* inbuf; void* outbuf; size_t size_in; @@ -27,21 +26,16 @@ static inline void udp_handler(sm_event *ev) while(n >= 0) { // Receive data - // \todo Global I/O lock means ~ 8% overhead; recvfrom() should be thread-safe - n = recvfrom(ev->fd, ev->inbuf, ev->size_in, MSG_DONTWAIT, (struct sockaddr *)&faddr, (socklen_t *)&addrsize); - //char _str[INET_ADDRSTRLEN]; - //inet_ntop(AF_INET, &(faddr.sin_addr), _str, INET_ADDRSTRLEN); - //fprintf(stderr, "recvfrom() in %p: received %d bytes from %s:%d.\n", (void*)pthread_self(), n, _str, faddr.sin_port); - - // Socket not ready - if(n == -1 && errno == EWOULDBLOCK) { - return; - } + n = recvfrom(ev->fd, ev->inbuf, ev->size_in, 0, (struct sockaddr *)&faddr, (socklen_t *)&addrsize); + + // Error and interrupt handling + //fprintf(stderr, "recvfrom(): thread %p ret %d errno %s.\n", (void*)pthread_self(), n, strerror(errno)); + if(n <= 0 || !ev->manager->is_running) { + if(errno != EINTR && errno != 0) { + log_error("udp: reading data from the socket failed: %d - %s\n", errno, strerror(errno)); + } - // Error - if(n <= 0) { - log_error("udp: reading data from the socket failed: %d - %s\n", errno, strerror(errno)); - return; + return; } debug_sm("udp: received %d bytes.\n", n); @@ -76,55 +70,7 @@ static inline void udp_handler(sm_event *ev) void *udp_master( void *obj ) { - int worker_id = 0, nfds = 0; - sm_manager* manager = (sm_manager *)obj; - sm_worker* master = &manager->master; - - while (manager->is_running) { - - // Select next worker - sm_worker* worker = &manager->workers[worker_id]; - pthread_mutex_lock(&worker->mutex); - - // Reserve backing-store and wait - pthread_mutex_lock(&master->mutex); - int current_fds = master->events_count; - sm_reserve_events(worker, current_fds * 2); - pthread_mutex_unlock(&master->mutex); - nfds = epoll_wait(master->epfd, worker->events, current_fds, 1000); - if (nfds < 0) { - debug_sm("udp: epoll_wait: %s\n", strerror(errno)); - worker->events_count = 0; - pthread_cond_signal(&worker->wakeup); - pthread_mutex_unlock(&worker->mutex); - continue; // Keep the same worker - } - - // Signalize - worker->events_count = nfds; - pthread_cond_signal(&worker->wakeup); - pthread_mutex_unlock(&worker->mutex); - - // Next worker - worker_id = next_worker(worker_id, manager); - } - - // Wake up all workers - int last_wrkr = worker_id; - for(;;) { - - sm_worker* worker = &manager->workers[worker_id]; - pthread_mutex_lock(&worker->mutex); - worker->events_count = -1; // Shut down worker - pthread_cond_signal(&worker->wakeup); - pthread_mutex_unlock(&worker->mutex); - worker_id = next_worker(worker_id, manager); - - // Finish with the starting worker - if(worker_id == last_wrkr) - break; - } - + UNUSED(obj); return NULL; } @@ -136,31 +82,15 @@ void *udp_worker( void *obj ) sm_event event; event.manager = worker->mgr; - event.fd = 0; - event.events = 0; + event.fd = worker->mgr->sockets[0].socket; event.inbuf = buf; event.outbuf = answer; event.size_in = event.size_out = SOCKET_BUFF_SIZE; - for(;;) { - pthread_mutex_lock(&worker->mutex); - pthread_cond_wait(&worker->wakeup, &worker->mutex); - - // Check - if(worker->events_count < 0) { - pthread_mutex_unlock(&worker->mutex); - break; - } - - // Evaluate - debug_sm("udp: worker #%d polled %d events.\n", worker->epfd, worker->events_count); - for(int i = 0; i < worker->events_count; ++i) { - event.fd = worker->events[i].data.fd; - event.events = worker->events[i].events; - udp_handler(&event); - } + while(worker->mgr->is_running) { - pthread_mutex_unlock(&worker->mutex); + // Handle UDP socket + udp_handler(&event); } debug_sm("udp: worker #%d finished.\n", worker->epfd); -- GitLab