/* * bronx_database.c -- Database handling functionality. * * Copyright (C) 2008 Groundwork Open Source * Written by Daniel Emmanuel Feinsmith * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor * Boston, MA 02110-1301, USA. * * Change Log: * DEF Created on September 17, 2007, 1:00 PM */ #include "bronx.h" #include "unistd.h" /* * Globals. */ int _database_any_routes_dirty = FALSE; /* * Functionality. */ void database_set_any_routes_dirty(int n) { _database_any_routes_dirty = n; } int database_get_any_routes_dirty() { int n; n = _database_any_routes_dirty; return(n); } int database_set_dirty(route_definition *route) { exec_args_container exec_s; char *error_msg; apr_status_t rv; exec_s.route = route; // // Get the # of rows in the messages table. // rv = sqlite3_exec(route->database, "select count(*) from messages", database_set_dirty_callback, (void *)&exec_s, &error_msg); if (rv != 0) bronx_logprintf(BRONX_LOGGING_DEBUG, "{database_set_dirty} (route: %s) Error retrieving from spillover, rv=%d, error_msg=%s", route->route_name, rv, error_msg ? error_msg : "No Error Message"); if (error_msg) sqlite3_free(error_msg); // // And return if the database has something in it. // return(route->database_dirty); } int database_set_dirty_callback(void *param, int colCount, char **values, char **names) { exec_args_container *exec_s = (exec_args_container*)param; apr_thread_mutex_lock(exec_s->route->database_mutex); exec_s->route->database_dirty = colCount > 0 ? (atoi(values[0]) > 0 ? TRUE : FALSE) : FALSE; apr_thread_mutex_unlock(exec_s->route->database_mutex); return(0); // GOOD value. } void database_dirty_yes(route_definition *route) { apr_thread_mutex_lock(route->database_mutex); route->database_dirty = TRUE; _database_any_routes_dirty = TRUE; apr_thread_mutex_unlock(route->database_mutex); } int database_is_dirty(route_definition *route) { int is_dirty; apr_thread_mutex_lock(route->database_mutex); is_dirty = route->database_dirty; apr_thread_mutex_unlock(route->database_mutex); return(is_dirty); } apr_status_t database_open(configuration_criteria *config, route_definition *route) { char pathname[256], *error_msg; apr_status_t rv; // // Open/Create Spillover database. // sprintf(pathname, "%s/%s.bronx", config->database_dir, route->route_name); bronx_logprintf(BRONX_LOGGING_NORMAL, "(route: %s) Creating database, name='%s'", route->route_name, pathname); if ((rv = sqlite3_open(pathname, &route->database))) { bronx_logprintf(BRONX_LOGGING_NORMAL, "(route: %s) Cannot create or open database, directory permissions issue?", route->route_name); sqlite3_close(route->database); return(rv); } // // Initialize Schema. // bronx_logprintf(BRONX_LOGGING_NORMAL, "(route: %s) Database open, initializing schema.", route->route_name); if(sqlite3_exec(route->database, "CREATE TABLE IF NOT EXISTS messages(id integer primary key autoincrement, message blob, timestamp integer key)", NULL, NULL, &error_msg)) { if (error_msg) sqlite3_free(error_msg); sqlite3_close(route->database); bronx_logprintf(BRONX_LOGGING_NORMAL, "(route: %s) Error creating schema for database.", route->route_name); return(rv); } bronx_logprintf(BRONX_LOGGING_NORMAL, "(route: %s) Schema initialized, database operational.", route->route_name, pathname); if (error_msg) sqlite3_free(error_msg); // // Clear internal representation of # of // rows in the spillover database. // database_set_dirty(route); // // Exit. // return(APR_SUCCESS); } apr_status_t database_initialize(configuration_criteria *config, route_definition *route) { char pathname[256]; apr_status_t rv; // // Create Mutex // rv = apr_thread_mutex_create(&route->database_mutex, APR_THREAD_MUTEX_DEFAULT, route->pool); // // If it worked, continue to create the database. // if (rv == APR_SUCCESS) { sprintf(pathname, "%s/%s.bronx", config->database_dir, route->route_name); /* * First, remove the old database. * Our database is a spillover that is not intended to survive * a reboot or a process failure. If we are here, we are in startup * mode, so first we need to get rid of the old file before proceeding. */ unlink(pathname); bronx_logprintf(BRONX_LOGGING_NORMAL, "{database_initialize} (route: %s) Removed old database '%s'.", route->route_name, pathname); /* * Now re-create and open the repository. */ rv = database_open(config, route); } // // And Exit. // return(rv); } int add_message_to_route_databases(message *msg) { route_definition *route; char *marshalled_msg; int i; bronx_logprintf(BRONX_LOGGING_DEBUG, "{add_message_to_route_databases}"); for(i=0; i < _configuration->num_routes; i++) { route = &_configuration->routes[i]; bronx_logprintf(BRONX_LOGGING_DEBUG, "{add_message_to_route_databases} (route: %s)", route->route_name); if (route->backup) { marshalled_msg = marshall_properties(route, msg); if (marshalled_msg != NULL) { bronx_logprintf(BRONX_LOGGING_DEBUG, "{add_message_to_route_databases} (route: %s) Adding Message", route->route_name); add_message_to_database(route, marshalled_msg); bronx_logprintf(BRONX_LOGGING_DEBUG, "{add_message_to_route_databases} (route: %s) Done Adding Message", route->route_name); } } } return(APR_SUCCESS); } int add_message_to_database(route_definition *route, char *msg) { if (route->database != NULL) { char *sql, *error_msg; bronx_logprintf(BRONX_LOGGING_DEBUG, "{add_message_to_database} (route: %s) Writing message to database.", route->route_name); sql = sqlite3_mprintf("INSERT INTO messages(message, timestamp) VALUES ('%q', %d);", msg, time(NULL)); if(!sqlite3_exec(route->database, sql, NULL, NULL, &error_msg)) { // // Successful write to database. // sqlite3_free(sql); if (error_msg) sqlite3_free(error_msg); // // Indicate that the database has something in it. // database_dirty_yes(route); // // And send a signal to wake up and process the data. // apr_thread_cond_signal(_route_thread_cond_signal); } else { bronx_logprintf(BRONX_LOGGING_NORMAL, "(route: %s) Database error: %s.", route->route_name, error_msg); sqlite3_free(sql); if (error_msg) sqlite3_free(error_msg); return(1); } } return(0); } int process_backup_data() { int num_routes_dirty = 0; if (_configuration->num_routes) { route_definition *route; exec_args_container exec_s; apr_status_t rv; char *error_msg; int i, process_route=0; /* * Loop through all routes * and process all data in the database * for that route. */ bronx_logprintf(BRONX_LOGGING_DEBUG, "{route_manage_inbound} Processing backup data."); for(i=0; i < _configuration->num_routes; i++) { route = &_configuration->routes[i]; bronx_logprintf(BRONX_LOGGING_DEBUG, "{route_manage_inbound} backup=%d, (database !=NULL)=%d, database_is_dirty=%d, route->is_connected=%d", route->backup, route->database != NULL, database_is_dirty(route), route->is_connected); if (route->backup && route->database != NULL && database_is_dirty(route)) { if (!route->is_connected) { time_t time_now = time(NULL); if (time_now > (route->last_reconnect_attempt_time + _configuration->seconds_between_reconnect_attempts)) { bronx_logprintf(BRONX_LOGGING_DEBUG, "{route_manage_inbound} (route: %s) Attempting to reconnect to broken route.", route->route_name); route->last_reconnect_attempt_time = time_now; process_route = (assert_route_connect(route) == APR_SUCCESS); } } else process_route = 1; if (process_route) { bronx_logprintf(BRONX_LOGGING_DEBUG, "{route_manage_inbound} (route: %s) Attempting to forward messages from route spillover.", route->route_name); exec_s.route = route; rv = sqlite3_exec(route->database, "select * FROM messages ORDER BY timestamp", route_message_from_database, (void *)&exec_s, &error_msg); if (rv != 0) bronx_logprintf(BRONX_LOGGING_DEBUG, "{process_backup_data} (route: %s) Error reading from spillover database, rv=%d, error_msg='%s'", route->route_name, rv, error_msg ? error_msg : "No Error Message"); if (error_msg) sqlite3_free(error_msg); // // Reset the indicator of whether or not there are // rows in the table after our processing. // num_routes_dirty += database_set_dirty(route); } else num_routes_dirty += database_is_dirty(route); } } database_set_any_routes_dirty(num_routes_dirty > 0); } return(num_routes_dirty); } /* * * Callback method used to handle sending messages that were received from the database. * Routes a single message from the database. */ int route_message_from_database(void *param, int colCount, char **values, char **names) { exec_args_container *exec_s = (exec_args_container*)param; apr_status_t rv; // // Can we route it? // bronx_logprintf(BRONX_LOGGING_DEBUG, "{route_message_from_database} Trying to route."); rv = route_marshalled_message(exec_s->route, values[BRONXDB_MESSAGE_COL], FALSE); if (rv == APR_SUCCESS) { char *sql, *error_msg; // // Delete the message. // bronx_logprintf(BRONX_LOGGING_DEBUG, "{route_message_from_database} (route: %s) Successfully Routed from Spillover, removing from database, colCont=%d, name for id='%s', id='%s'.", exec_s->route->route_name, colCount, names[BRONXDB_ID_COL], values[BRONXDB_ID_COL]); sql = sqlite3_mprintf("DELETE FROM messages WHERE id = '%s'", values[BRONXDB_ID_COL]); sqlite3_exec(exec_s->route->database, sql, NULL, NULL, &error_msg); // // Clean Up. // sqlite3_free(sql); if (error_msg) sqlite3_free(error_msg); // // And Return with a positive result. // return(0); } return(1); } void database_purge_and_shutoff_backups(route_definition *route, char *error_msg) { // Error occurred while deleting. bronx_logprintf(BRONX_LOGGING_NORMAL, "(route: %s) database error: %s, turning off database backup for this route.", route->route_name, error_msg); /* * A presumption: Something is wrong with the database. * Simplistically, we purge the database and turn off backup * for this route. */ database_purge(route); route->backup = 0; } void database_purge(route_definition *route) { char *error_msg, *temp_msg; temp_msg = sqlite3_mprintf("DELETE FROM messages"); // Delete all items from the database sqlite3_exec(route->database, temp_msg, NULL, NULL, &error_msg); // Perform the deletion if (error_msg) sqlite3_free(error_msg); sqlite3_free(temp_msg); route->database_dirty = 0; }