/* * CINELERRA * Copyright (C) 2008 Adam Williams * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include "asset.h" #include "bcsignals.h" #include "brender.h" #include "clip.h" #include "condition.h" #include "bchash.h" #include "edl.h" #include "filesystem.h" #include "filexml.h" #include "language.h" #include "mutex.h" #include "mwindow.h" #include "packagedispatcher.h" #include "preferences.h" #include "render.h" #include "renderfarm.h" #include "renderfarmclient.h" #include "bctimer.h" #include "transportque.h" #include #include #include #include #include #include #include #include #include #include #include #include RenderFarmServer::RenderFarmServer( MWindow *mwindow, PackageDispatcher *packages, Preferences *preferences, int use_local_rate, int *result_return, int64_t *total_return, Mutex *total_return_lock, Asset *default_asset, EDL *edl, BRender *brender) { this->mwindow = mwindow; this->packages = packages; this->preferences = preferences; this->use_local_rate = use_local_rate; this->result_return = result_return; this->total_return = total_return; this->total_return_lock = total_return_lock; this->default_asset = default_asset; this->edl = edl; this->brender = brender; client_lock = new Mutex("RenderFarmServer::client_lock"); } RenderFarmServer::~RenderFarmServer() { clients.remove_all_objects(); delete client_lock; } // Open connections to clients. int RenderFarmServer::start_clients() { int result = 0; for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++) { client_lock->lock("RenderFarmServer::start_clients"); RenderFarmServerThread *client = new RenderFarmServerThread(this, i); clients.append(client); result = client->start_loop(); client_lock->unlock(); } return result; } // The render farm must wait for all the clients to finish. int RenderFarmServer::wait_clients() { //printf("RenderFarmServer::wait_clients 1\n"); clients.remove_all_objects(); //printf("RenderFarmServer::wait_clients 2\n"); return 0; } // Waits for requests from every client. // Joins when the client is finished. RenderFarmServerThread::RenderFarmServerThread(RenderFarmServer *server, int number) : Thread(1, 0, 0) { this->server = server; this->number = number; socket_fd = -1; frames_per_second = 0; watchdog = 0; buffer = 0; datagram = 0; Thread::set_synchronous(1); } RenderFarmServerThread::~RenderFarmServerThread() { //printf("RenderFarmServerThread::~RenderFarmServerThread 1 %p\n", this); Thread::join(); //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n"); if(socket_fd >= 0) close(socket_fd); if(watchdog) delete watchdog; if(buffer) delete [] buffer; if(datagram) delete [] datagram; //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n"); } int RenderFarmServerThread::open_client(const char *hostname, int port) { int socket_fd = -1; int result = 0; // Open file for master node if(hostname[0] == '/') { if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { perror(_("RenderFarmServerThread::start_loop: socket\n")); result = 1; } else { struct sockaddr_un addr; addr.sun_family = AF_FILE; strcpy(addr.sun_path, hostname); int size = (offsetof(struct sockaddr_un, sun_path) + strlen(hostname) + 1); // The master node is always created by BRender. Keep trying for 30 seconds. #define ATTEMPT_DELAY 100000 int done = 0; int attempt = 0; do { if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0) { attempt++; if(attempt > 30000000 / ATTEMPT_DELAY) { fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"), hostname, strerror(errno)); result = 1; } else usleep(ATTEMPT_DELAY); } else done = 1; }while(!result && !done); } } else // Open socket { if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { perror(_("RenderFarmServerThread::start_loop: socket")); result = 1; } else { // Open port struct sockaddr_in addr; struct hostent *hostinfo; addr.sin_family = AF_INET; addr.sin_port = htons(port); hostinfo = gethostbyname(hostname); if(hostinfo == NULL) { fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"), hostname); result = 1; } else { addr.sin_addr = *(struct in_addr *) hostinfo->h_addr; if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"), hostname, strerror(errno)); result = 1; } } } } if(result) socket_fd = -1; return socket_fd; } int RenderFarmServerThread::start_loop() { int result = 0; socket_fd = open_client(server->preferences->get_node_hostname(number), server->preferences->get_node_port(number)); if(socket_fd < 0) result = 1; if(!result) { watchdog = new RenderFarmWatchdog(this, 0); watchdog->start(); } if(!result) Thread::start(); return result; } int64_t RenderFarmServerThread::read_int64(int *error) { int temp = 0; if(!error) error = &temp; unsigned char data[sizeof(int64_t)]; *error = (read_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t)); // Make it return 1 if error so it can be used to read a result code from the // server. int64_t result = 1; if(!*error) { result = (((int64_t)data[0]) << 56) | (((uint64_t)data[1]) << 48) | (((uint64_t)data[2]) << 40) | (((uint64_t)data[3]) << 32) | (((uint64_t)data[4]) << 24) | (((uint64_t)data[5]) << 16) | (((uint64_t)data[6]) << 8) | data[7]; } return result; } int RenderFarmServerThread::write_int64(int64_t value) { unsigned char data[sizeof(int64_t)]; data[0] = (value >> 56) & 0xff; data[1] = (value >> 48) & 0xff; data[2] = (value >> 40) & 0xff; data[3] = (value >> 32) & 0xff; data[4] = (value >> 24) & 0xff; data[5] = (value >> 16) & 0xff; data[6] = (value >> 8) & 0xff; data[7] = value & 0xff; return (write_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t)); } int RenderFarmServerThread::read_socket(char *data, int len) { int bytes_read = 0; int offset = 0; //printf("RenderFarmServerThread::read_socket 1\n"); watchdog->begin_request(); while(len > 0 && bytes_read >= 0) { enable_cancel(); bytes_read = read(socket_fd, data + offset, len); disable_cancel(); if(bytes_read > 0) { len -= bytes_read; offset += bytes_read; } else if(bytes_read < 0) break; } watchdog->end_request(); //printf("RenderFarmServerThread::read_socket 10\n"); return offset; } int RenderFarmServerThread::write_socket(char *data, int len) { //printf("RenderFarmServerThread::write_socket 1\n"); int result = write(socket_fd, data, len); //printf("RenderFarmServerThread::write_socket 10\n"); return result; } void RenderFarmServerThread::reallocate_buffer(int size) { if(buffer && buffer_allocated < size) { delete [] buffer; buffer = 0; } if(!buffer && size) { buffer = new unsigned char[size]; buffer_allocated = size; } } void RenderFarmServerThread::run() { // Wait for requests unsigned char header[5]; int done = 0; int bytes_read = 0; buffer = 0; buffer_allocated = 0; // fs_server = new RenderFarmFSServer(this); // fs_server->initialize(); // Send command to run package renderer. write_int64(RENDERFARM_PACKAGES); while(!done) { // Wait for requests. // Requests consist of request ID's and accompanying buffers. // Get request ID. bytes_read = read_socket((char*)header, 5); //printf("RenderFarmServerThread::run 1\n"); if(bytes_read != 5) { done = 1; continue; } int request_id = header[0]; int64_t request_size = (((u_int32_t)header[1]) << 24) | (((u_int32_t)header[2]) << 16) | (((u_int32_t)header[3]) << 8) | (u_int32_t)header[4]; reallocate_buffer(request_size); // Get accompanying buffer bytes_read = read_socket((char*)buffer, request_size); //printf("RenderFarmServerThread::run 2 %d %jd %d\n", request_id, request_size, bytes_read); if(bytes_read != request_size) { done = 1; continue; } //printf("RenderFarmServerThread::run 3\n"); switch(request_id) { case RENDERFARM_PREFERENCES: send_preferences(); break; case RENDERFARM_ASSET: send_asset(); break; case RENDERFARM_EDL: send_edl(); break; case RENDERFARM_PACKAGE: send_package(buffer); break; case RENDERFARM_PROGRESS: set_progress(buffer); break; case RENDERFARM_SET_RESULT: set_result(buffer); break; case RENDERFARM_SET_VMAP: set_video_map(buffer); break; case RENDERFARM_GET_RESULT: get_result(); break; case RENDERFARM_DONE: //printf("RenderFarmServerThread::run 10\n"); done = 1; break; case RENDERFARM_KEEPALIVE: break; default: // if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer)) { printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id); } break; } //printf("RenderFarmServerThread::run 10 %d %jd\n", request_id, request_size); } // Don't let watchdog kill the entire renderfarm when a client finishes // normally. if(watchdog) { //printf("RenderFarmServerThread::run 20\n"); delete watchdog; watchdog = 0; } // delete fs_server; } void RenderFarmServerThread::write_string(char *string) { int i, len; i = 0; len = strlen(string) + 1; datagram = new char[len + 4]; STORE_INT32(len); memcpy(datagram + i, string, len); write_socket((char*)datagram, len + 4); //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n", // datagram[0], datagram[1], datagram[2], datagram[3]); delete [] datagram; datagram = 0; } void RenderFarmServerThread::send_preferences() { BC_Hash defaults; char *string; server->preferences->save_defaults(&defaults); defaults.save_string(string); write_string(string); free(string); } void RenderFarmServerThread::send_asset() { BC_Hash defaults; char *string1; // The asset must be sent in two segments. // One segment is stored in the EDL and contains decoding information. // One segment is stored in the asset and contains encoding information. server->default_asset->save_defaults(&defaults, 0, 1, 1, 1, 1, 1); defaults.save_string(string1); FileXML file; server->default_asset->write(&file, 0, 0); file.terminate_string(); write_string(string1); write_string(file.string()); free(string1); } void RenderFarmServerThread::send_edl() { FileXML file; // Save the XML server->edl->save_xml(&file, 0, 0, 0); file.terminate_string(); //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string); write_string(file.string()); //printf("RenderFarmServerThread::send_edl 2\n"); } void RenderFarmServerThread::send_package(unsigned char *buffer) { this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) | (((u_int32_t)buffer[1]) << 16) | (((u_int32_t)buffer[2]) << 8) | ((u_int32_t)buffer[3])) / 65536.0; //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second); RenderPackage *package = server->packages->get_package(frames_per_second, number, server->use_local_rate); //printf("RenderFarmServerThread::send_package 2\n"); datagram = new char[BCTEXTLEN]; // No more packages if(!package) { //printf("RenderFarmServerThread::send_package 1\n"); datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0; write_socket(datagram, 4); } else // Encode package { //printf("RenderFarmServerThread::send_package 10\n"); int i = 4; strcpy(&datagram[i], package->path); i += strlen(package->path); datagram[i++] = 0; STORE_INT32(package->audio_start); STORE_INT32(package->audio_end); STORE_INT32(package->video_start); STORE_INT32(package->video_end); int use_brender = (server->brender ? 1 : 0); STORE_INT32(use_brender); STORE_INT32(package->audio_do); STORE_INT32(package->video_do); int len = i; i = 0; STORE_INT32(len - 4); write_socket(datagram, len); } delete [] datagram; datagram = 0; } void RenderFarmServerThread::set_progress(unsigned char *buffer) { server->total_return_lock->lock("RenderFarmServerThread::set_progress"); *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) | (((u_int32_t)buffer[1]) << 16) | (((u_int32_t)buffer[2]) << 8) | ((u_int32_t)buffer[3]); frames_per_second = (double)((((u_int32_t)buffer[4]) << 24) | (((u_int32_t)buffer[5]) << 16) | (((u_int32_t)buffer[6]) << 8) | ((u_int32_t)buffer[7])) / 65536.0; server->total_return_lock->unlock(); server->preferences->set_rate(frames_per_second, number); // This locks the preferences if(server->mwindow) server->mwindow->preferences->copy_rates_from( server->preferences); } int RenderFarmServerThread::set_video_map(unsigned char *buffer) { if(server->brender) { server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) | (((u_int32_t)buffer[1]) << 16) | (((u_int32_t)buffer[2]) << 8) | ((u_int32_t)buffer[3]), (int64_t)(((u_int32_t)buffer[4]) << 24) | (((u_int32_t)buffer[5]) << 16) | (((u_int32_t)buffer[6]) << 8) | ((u_int32_t)buffer[7])); char return_value[1]; return_value[0] = 0; write_socket(return_value, 1); return 0; } return 1; } void RenderFarmServerThread::set_result(unsigned char *buffer) { //printf("RenderFarmServerThread::set_result %p\n", buffer); if(!*server->result_return) *server->result_return = buffer[0]; } void RenderFarmServerThread::get_result() { unsigned char data[1]; data[0] = *server->result_return; write_socket((char*)data, 1); } RenderFarmWatchdog::RenderFarmWatchdog( RenderFarmServerThread *server, RenderFarmClientThread *client) : Thread(1, 0, 0) { this->server = server; this->client = client; next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0); request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0); done = 0; } RenderFarmWatchdog::~RenderFarmWatchdog() { done = 1; next_request->unlock(); request_complete->unlock(); join(); delete next_request; delete request_complete; } void RenderFarmWatchdog::begin_request() { next_request->unlock(); } void RenderFarmWatchdog::end_request() { request_complete->unlock(); } void RenderFarmWatchdog::run() { while(!done) { next_request->lock("RenderFarmWatchdog::run"); int result = request_complete->timed_lock(RENDERFARM_TIMEOUT * 1000000, "RenderFarmWatchdog::run"); //printf("RenderFarmWatchdog::run 1 %d\n", result); if(result) { if(client) { printf("RenderFarmWatchdog::run 1 killing client pid %d\n", client->pid); kill(client->pid, SIGKILL); } else if(server) { printf("RenderFarmWatchdog::run 1 killing server thread %p\n", server); server->cancel(); unsigned char buffer[4]; buffer[0] = 1; server->set_result(buffer); } done = 1; } } }