/* $NetBSD: isns_task.c,v 1.1.1.1 2011/01/16 01:22:50 agc Exp $ */ /*- * Copyright (c) 2004,2009 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Wasabi Systems, Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include __RCSID("$NetBSD: isns_task.c,v 1.1.1.1 2011/01/16 01:22:50 agc Exp $"); /* * isns_task.c */ #include #include #include #include "isns.h" #include "isns_config.h" static struct iovec write_buf[2 + (ISNS_MAX_PDU_PAYLOAD / ISNS_BUF_SIZE) + ((ISNS_MAX_PDU_PAYLOAD % ISNS_BUF_SIZE) != 0)]; static isns_task_handler isns_task_discover_server; static isns_task_handler isns_task_reconnect_server; static isns_task_handler isns_task_send_pdu; static isns_task_handler isns_task_init_socket_io; static isns_task_handler isns_task_init_refresh; void isns_run_task(struct isns_task_s *task_p) { static isns_task_handler *task_dispatch_table[ISNS_NUM_TASKS] = { isns_task_discover_server, isns_task_reconnect_server, isns_task_send_pdu, isns_task_init_socket_io, isns_task_init_refresh }; DBG("isns_run_task: task_type=%d\n", task_p->task_type); if (task_p->task_type < ARRAY_ELEMS(task_dispatch_table)) task_dispatch_table[task_p->task_type](task_p); else DBG("isns_run_task: unknown task type=%d\n", task_p->task_type); } int isns_wait_task(struct isns_task_s *task_p, const struct timespec *timeout_p) { struct timeval tv_now; struct timespec ts_abstime; int rval; DBG("isns_wait_task: waitable=%d\n", task_p->waitable); if (!task_p->waitable) return EPERM; pthread_mutex_lock(&task_p->wait_mutex); if (timeout_p == NULL) { rval = pthread_cond_wait(&task_p->wait_condvar, &task_p->wait_mutex); } else { gettimeofday(&tv_now, NULL); TIMEVAL_TO_TIMESPEC(&tv_now, &ts_abstime); timespecadd(&ts_abstime, timeout_p, &ts_abstime); rval = pthread_cond_timedwait(&task_p->wait_condvar, &task_p->wait_mutex, &ts_abstime); } pthread_mutex_unlock(&task_p->wait_mutex); isns_free_task(task_p); DBG("isns_wait_task: wait done (rval=%d)\n", rval); return rval; } void isns_end_task(struct isns_task_s *task_p) { DBG("isns_end_task: %p\n", task_p); if (task_p == task_p->cfg_p->curtask_p) task_p->cfg_p->curtask_p = NULL; if (task_p->waitable) pthread_cond_signal(&task_p->wait_condvar); isns_free_task(task_p); } static void isns_task_discover_server(struct isns_task_s *task_p) { /* discover server here */ DBG("isns_task_discover_server: entered\n"); isns_end_task(task_p); } /* * isns_task_reconnect_server() */ static void isns_task_reconnect_server(struct isns_task_s *task_p) { struct addrinfo *ai_p; int rv; DBG("isns_task_reconnect_server: entered\n"); ai_p = task_p->var.reconnect_server.ai_p; rv = isns_socket_create(&(task_p->cfg_p->sd), ai_p->ai_family, ai_p->ai_socktype); if (rv != 0) return; rv = isns_socket_connect(task_p->cfg_p->sd, ai_p->ai_addr, ai_p->ai_addrlen); if (rv != 0) { /* Add ISNS_EVT_TIMER_RECON to kqueue */ rv = isns_change_kevent_list(task_p->cfg_p, (uintptr_t)ISNS_EVT_TIMER_RECON, EVFILT_TIMER, EV_ADD, (int64_t)ISNS_EVT_TIMER_RECON_PERIOD_MS, (intptr_t)isns_kevent_timer_recon); if (rv == -1) DBG("isns_task_reconnect_server: error on " "isns_change_kevent_list(1)\n"); } else { task_p->cfg_p->sd_connected = 1; /* Add cfg_p->sd to kqueue */ rv = isns_change_kevent_list(task_p->cfg_p, (uintptr_t)(task_p->cfg_p->sd), EVFILT_READ, EV_ADD | EV_CLEAR, (int64_t)0, (intptr_t)isns_kevent_socket); if (rv == -1) DBG("isns_task_reconnect_server: error on " "isns_change_kevent_lists(2)\n"); isns_end_task(task_p); } } /* * isns_task_send_pdu() * * We send all of the pdu's associated with transaction task_p->trans_p here. * * Assumptions: * (1) task_p->trans_p->pdu_req_list is an ordered (seq_id) list of * related (trans_id), appropriately sized pdus to be sent. The first * pdu has flag ISNS_FLAG_FIRST_PDU set and the last pdu has flag * ISNS_FLAG_LAST_PDU set. */ static void isns_task_send_pdu(struct isns_task_s *task_p) { struct iovec *iovp; struct isns_config_s *cfg_p; struct isns_pdu_s *pdu_p; /* points to first pdu in pdu_req_list */ struct isns_buffer_s *buf_p; ssize_t bytes_written; ssize_t count; size_t bytes_to_write; int iovcnt, cur_iovec; char *ptr; DBG("isns_task_send_pdu: entered\n"); cfg_p = task_p->cfg_p; pdu_p = task_p->var.send_pdu.pdu_p; while (pdu_p != NULL) { /* adjust byte order if necessary */ if (pdu_p->byteorder_host) { pdu_p->hdr.isnsp_version = isns_htons(pdu_p->hdr. isnsp_version); pdu_p->hdr.func_id = isns_htons(pdu_p->hdr.func_id); pdu_p->hdr.payload_len = isns_htons(pdu_p->hdr. payload_len); pdu_p->hdr.flags = isns_htons(pdu_p->hdr.flags); pdu_p->hdr.trans_id = isns_htons(pdu_p->hdr.trans_id); pdu_p->hdr.seq_id = isns_htons(pdu_p->hdr.seq_id); pdu_p->byteorder_host = 0; } DUMP_PDU(pdu_p); /* send PDU via socket here */ write_buf[0].iov_base = &(pdu_p->hdr); write_buf[0].iov_len = sizeof(pdu_p->hdr); bytes_to_write = write_buf[0].iov_len; iovcnt = 1; buf_p = pdu_p->payload_p; while (buf_p != NULL) { write_buf[iovcnt].iov_base = isns_buffer_data(buf_p,0); write_buf[iovcnt].iov_len = buf_p->cur_len; bytes_to_write += write_buf[iovcnt].iov_len; iovcnt++; buf_p = buf_p->next; } /* iovcnt and bytes_to_write are initialized */ cur_iovec = 0; buf_p = ((struct isns_buffer_s *)(void *)pdu_p) - 1; do { iovp = &(write_buf[cur_iovec]); bytes_written = isns_socket_writev(cfg_p->sd, iovp, iovcnt); if (bytes_written == -1) { DBG("isns_task_send_pdu: error on " "isns_socket_writev\n"); isns_socket_close(cfg_p->sd); cfg_p->sd_connected = 0; isns_process_connection_loss(cfg_p); if (cfg_p->pdu_in_p != NULL) { isns_free_pdu(cfg_p->pdu_in_p); cfg_p->pdu_in_p = NULL; } break; } if (bytes_written < (ssize_t)bytes_to_write) { count = bytes_written; while (buf_p != NULL) { /* -OR- while (1) */ if ((unsigned)count >= write_buf[ cur_iovec].iov_len) { count -= write_buf[cur_iovec]. iov_len; if (cur_iovec == 0) buf_p = pdu_p-> payload_p; else buf_p = buf_p->next; cur_iovec++; iovcnt--; if (count == 0) { /* Do another write */ break; } else { /* Look at new iovec */ continue; } } else { write_buf[cur_iovec].iov_len -= count; ptr = (char *) write_buf[cur_iovec].iov_base; ptr += count; write_buf[cur_iovec].iov_base = ptr; /* Do another write */ break; } } } bytes_to_write -= bytes_written; } while (bytes_to_write); pdu_p = pdu_p->next; } if (!task_p->waitable) { isns_complete_trans(task_p->var.send_pdu.trans_p); isns_end_task(task_p); } } /* * isns_task_init_socket_io() */ static void isns_task_init_socket_io(struct isns_task_s *task_p) { struct isns_config_s *cfg_p; int rv; DBG("isns_task_init_socket_io: entered\n"); cfg_p = task_p->cfg_p; if (cfg_p->sd_connected) { isns_socket_close(cfg_p->sd); cfg_p->sd_connected = 0; /* We may have received part of an unsolicited/duplicate pdu */ if (cfg_p->pdu_in_p != NULL) { isns_free_pdu(cfg_p->pdu_in_p); cfg_p->pdu_in_p = NULL; } } /* May have an allocated 'struct addrinfo', whether connected or not */ if (cfg_p->ai_p != NULL) { isns_free(cfg_p->ai_p); cfg_p->ai_p = NULL; } cfg_p->sd = task_p->var.init_socket_io.sd; cfg_p->ai_p = task_p->var.init_socket_io.ai_p; cfg_p->sd_connected = 1; /* Add cfg_p->sd to kqueue */ rv = isns_change_kevent_list(cfg_p, (uintptr_t)cfg_p->sd, EVFILT_READ, EV_ADD | EV_CLEAR, (int64_t)0, (intptr_t)isns_kevent_socket); if (rv == -1) DBG("isns_task_init_socket_io: error on " "isns_change_kevent_list\n"); isns_end_task(task_p); } /* * isns_task_init_refresh(struct isns_task_s *task_p) */ static void isns_task_init_refresh(struct isns_task_s *task_p) { struct isns_config_s *cfg_p; int rval; DBG("isns_task_init_refresh: entered\n"); /* Free any previous refresh info. */ cfg_p = task_p->cfg_p; if (cfg_p->refresh_p != NULL) { if (cfg_p->refresh_p->trans_p != NULL) isns_free_trans(cfg_p->refresh_p->trans_p); isns_free(cfg_p->refresh_p); } /* Assign new refresh info into config struct. */ cfg_p->refresh_p = task_p->var.init_refresh.ref_p; cfg_p->refresh_p->trans_p = NULL; /* Setup (or change) kevent timer for reg refresh. */ rval = isns_change_kevent_list(cfg_p, (uintptr_t)ISNS_EVT_TIMER_REFRESH, EVFILT_TIMER, EV_ADD | EV_ENABLE, (int64_t)cfg_p->refresh_p->interval * 1000, (intptr_t)isns_kevent_timer_refresh); if (rval == -1) { DBG("isns_task_init_refresh: " "error on isns_change_kevent_list()\n"); } isns_end_task(task_p); } struct isns_task_s * isns_new_task(struct isns_config_s *cfg_p, uint8_t task_type, int waitable) { struct isns_buffer_s *buf_p; struct isns_task_s *task_p; pthread_mutexattr_t mutexattr; pthread_condattr_t condattr; task_p = NULL; buf_p = isns_new_buffer((int)sizeof(struct isns_task_s)); if (buf_p) { task_p = (struct isns_task_s *)isns_buffer_data(buf_p, 0); task_p->cfg_p = cfg_p; task_p->task_type = task_type; task_p->waitable = waitable; if (waitable) { pthread_mutexattr_init(&mutexattr); pthread_mutexattr_settype(&mutexattr, ISNS_MUTEX_TYPE_NORMAL); pthread_mutex_init(&task_p->wait_mutex, &mutexattr); pthread_condattr_init(&condattr); pthread_cond_init(&task_p->wait_condvar, &condattr); task_p->wait_ref_count = 2; } } DBG("isns_new_task: %p, waitable=%d\n", task_p, waitable); return task_p; } void isns_free_task(struct isns_task_s *task_p) { struct isns_buffer_s *buf_p; int ref_count; DBG("isns_free_task: %p\n", task_p); if (task_p->waitable) { pthread_mutex_lock(&task_p->wait_mutex); ref_count = --task_p->wait_ref_count; pthread_mutex_unlock(&task_p->wait_mutex); if (ref_count > 0) { DBG("isns_free_task: ref_count > 0, no free done\n"); return; } pthread_mutex_destroy(&task_p->wait_mutex); pthread_cond_destroy(&task_p->wait_condvar); } buf_p = ((struct isns_buffer_s *)(void *)(task_p))-1; isns_free_buffer(buf_p); } void isns_taskq_insert_head(struct isns_config_s *cfg_p, struct isns_task_s *task_p) { pthread_mutex_lock(&cfg_p->taskq_mutex); SIMPLEQ_INSERT_HEAD(&cfg_p->taskq_head, task_p, taskq_entry); pthread_mutex_unlock(&cfg_p->taskq_mutex); DBG("isns_taskq_insert_head: %p\n", task_p); } void isns_taskq_insert_tail(struct isns_config_s *cfg_p, struct isns_task_s *task_p) { pthread_mutex_lock(&cfg_p->taskq_mutex); SIMPLEQ_INSERT_TAIL(&cfg_p->taskq_head, task_p, taskq_entry); pthread_mutex_unlock(&cfg_p->taskq_mutex); DBG("isns_taskq_insert_tail: %p\n", task_p); } struct isns_task_s * isns_taskq_remove(struct isns_config_s *cfg_p) { struct isns_task_s *task_p = NULL; pthread_mutex_lock(&cfg_p->taskq_mutex); if ((task_p = SIMPLEQ_FIRST(&cfg_p->taskq_head)) != NULL) SIMPLEQ_REMOVE_HEAD(&cfg_p->taskq_head, taskq_entry); pthread_mutex_unlock(&cfg_p->taskq_mutex); DBG("isns_taskq_remove: %p\n", task_p); return task_p; } struct isns_task_s * isns_taskq_remove_trans(struct isns_config_s *cfg_p, uint16_t trans_id) { struct isns_task_s *task_p; int trans_found; trans_found = 0; pthread_mutex_lock(&cfg_p->taskq_mutex); SIMPLEQ_FOREACH(task_p, &cfg_p->taskq_head, taskq_entry) { if ((task_p->task_type == ISNS_TASK_SEND_PDU) && (task_p->var.send_pdu.trans_p->id == trans_id)) { trans_found = 1; break; } } if (trans_found) { SIMPLEQ_REMOVE(&cfg_p->taskq_head, task_p, isns_task_s, taskq_entry); } pthread_mutex_unlock(&cfg_p->taskq_mutex); return (trans_found ? task_p : NULL); }