4 * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 #include "bcsignals.h"
26 #include "condition.h"
29 #include "filesystem.h"
35 #include "packagedispatcher.h"
36 #include "preferences.h"
38 #include "renderfarm.h"
39 #include "renderfarmclient.h"
41 #include "transportque.h"
44 #include <arpa/inet.h>
48 #include <netinet/in.h>
52 #include <sys/socket.h>
53 #include <sys/types.h>
60 RenderFarmServer::RenderFarmServer(
62 PackageDispatcher *packages,
63 Preferences *preferences,
66 int64_t *total_return,
67 Mutex *total_return_lock,
72 this->mwindow = mwindow;
73 this->packages = packages;
74 this->preferences = preferences;
75 this->use_local_rate = use_local_rate;
76 this->result_return = result_return;
77 this->total_return = total_return;
78 this->total_return_lock = total_return_lock;
79 this->default_asset = default_asset;
81 this->brender = brender;
82 client_lock = new Mutex("RenderFarmServer::client_lock");
85 RenderFarmServer::~RenderFarmServer()
87 clients.remove_all_objects();
91 // Open connections to clients.
92 int RenderFarmServer::start_clients()
96 for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
98 client_lock->lock("RenderFarmServer::start_clients");
99 RenderFarmServerThread *client = new RenderFarmServerThread(this,
101 clients.append(client);
103 result = client->start_loop();
104 client_lock->unlock();
110 // The render farm must wait for all the clients to finish.
111 int RenderFarmServer::wait_clients()
113 //printf("RenderFarmServer::wait_clients 1\n");
114 clients.remove_all_objects();
115 //printf("RenderFarmServer::wait_clients 2\n");
130 // Waits for requests from every client.
131 // Joins when the client is finished.
132 RenderFarmServerThread::RenderFarmServerThread(RenderFarmServer *server,
136 this->server = server;
137 this->number = number;
139 frames_per_second = 0;
143 Thread::set_synchronous(1);
148 RenderFarmServerThread::~RenderFarmServerThread()
150 //printf("RenderFarmServerThread::~RenderFarmServerThread 1 %p\n", this);
152 //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n");
153 if(socket_fd >= 0) close(socket_fd);
154 if(watchdog) delete watchdog;
155 if(buffer) delete [] buffer;
156 if(datagram) delete [] datagram;
157 //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n");
161 int RenderFarmServerThread::open_client(const char *hostname, int port)
166 // Open file for master node
167 if(hostname[0] == '/')
169 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
171 perror(_("RenderFarmServerThread::start_loop: socket\n"));
176 struct sockaddr_un addr;
177 addr.sun_family = AF_FILE;
178 strcpy(addr.sun_path, hostname);
179 int size = (offsetof(struct sockaddr_un, sun_path) +
180 strlen(hostname) + 1);
182 // The master node is always created by BRender. Keep trying for 30 seconds.
184 #define ATTEMPT_DELAY 100000
190 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
193 if(attempt > 30000000 / ATTEMPT_DELAY)
195 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
201 usleep(ATTEMPT_DELAY);
205 }while(!result && !done);
211 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
213 perror(_("RenderFarmServerThread::start_loop: socket"));
219 struct sockaddr_in addr;
220 struct hostent *hostinfo;
221 addr.sin_family = AF_INET;
222 addr.sin_port = htons(port);
223 hostinfo = gethostbyname(hostname);
226 fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
232 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
234 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
236 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
245 if(result) socket_fd = -1;
250 int RenderFarmServerThread::start_loop()
254 socket_fd = open_client(server->preferences->get_node_hostname(number),
255 server->preferences->get_node_port(number));
257 if(socket_fd < 0) result = 1;
261 watchdog = new RenderFarmWatchdog(this, 0);
265 if(!result) Thread::start();
280 int64_t RenderFarmServerThread::read_int64(int *error)
283 if(!error) error = &temp;
285 unsigned char data[sizeof(int64_t)];
286 *error = (read_socket((char*)data, sizeof(int64_t)) !=
289 // Make it return 1 if error so it can be used to read a result code from the
294 result = (((int64_t)data[0]) << 56) |
295 (((uint64_t)data[1]) << 48) |
296 (((uint64_t)data[2]) << 40) |
297 (((uint64_t)data[3]) << 32) |
298 (((uint64_t)data[4]) << 24) |
299 (((uint64_t)data[5]) << 16) |
300 (((uint64_t)data[6]) << 8) |
306 int RenderFarmServerThread::write_int64(int64_t value)
308 unsigned char data[sizeof(int64_t)];
309 data[0] = (value >> 56) & 0xff;
310 data[1] = (value >> 48) & 0xff;
311 data[2] = (value >> 40) & 0xff;
312 data[3] = (value >> 32) & 0xff;
313 data[4] = (value >> 24) & 0xff;
314 data[5] = (value >> 16) & 0xff;
315 data[6] = (value >> 8) & 0xff;
316 data[7] = value & 0xff;
317 return (write_socket((char*)data, sizeof(int64_t)) !=
323 int RenderFarmServerThread::read_socket(char *data, int len)
327 //printf("RenderFarmServerThread::read_socket 1\n");
328 watchdog->begin_request();
329 while(len > 0 && bytes_read >= 0)
332 bytes_read = read(socket_fd, data + offset, len);
337 offset += bytes_read;
343 watchdog->end_request();
344 //printf("RenderFarmServerThread::read_socket 10\n");
349 int RenderFarmServerThread::write_socket(char *data, int len)
351 //printf("RenderFarmServerThread::write_socket 1\n");
352 int result = write(socket_fd, data, len);
353 //printf("RenderFarmServerThread::write_socket 10\n");
358 void RenderFarmServerThread::reallocate_buffer(int size)
360 if(buffer && buffer_allocated < size)
368 buffer = new unsigned char[size];
369 buffer_allocated = size;
373 void RenderFarmServerThread::run()
376 unsigned char header[5];
382 buffer_allocated = 0;
383 // fs_server = new RenderFarmFSServer(this);
384 // fs_server->initialize();
387 // Send command to run package renderer.
388 write_int64(RENDERFARM_PACKAGES);
395 // Wait for requests.
396 // Requests consist of request ID's and accompanying buffers.
398 bytes_read = read_socket((char*)header, 5);
399 //printf("RenderFarmServerThread::run 1\n");
406 int request_id = header[0];
407 int64_t request_size = (((u_int32_t)header[1]) << 24) |
408 (((u_int32_t)header[2]) << 16) |
409 (((u_int32_t)header[3]) << 8) |
410 (u_int32_t)header[4];
412 reallocate_buffer(request_size);
414 // Get accompanying buffer
415 bytes_read = read_socket((char*)buffer, request_size);
417 //printf("RenderFarmServerThread::run 2 %d " _LD " %d\n", request_id, request_size, bytes_read);
418 if(bytes_read != request_size)
423 //printf("RenderFarmServerThread::run 3\n");
427 case RENDERFARM_PREFERENCES:
431 case RENDERFARM_ASSET:
439 case RENDERFARM_PACKAGE:
440 send_package(buffer);
443 case RENDERFARM_PROGRESS:
444 set_progress(buffer);
447 case RENDERFARM_SET_RESULT:
451 case RENDERFARM_SET_VMAP:
452 set_video_map(buffer);
455 case RENDERFARM_GET_RESULT:
459 case RENDERFARM_DONE:
460 //printf("RenderFarmServerThread::run 10\n");
464 case RENDERFARM_KEEPALIVE:
468 // if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer))
470 printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
474 //printf("RenderFarmServerThread::run 10 %d " _LD "\n", request_id, request_size);
477 // Don't let watchdog kill the entire renderfarm when a client finishes
481 //printf("RenderFarmServerThread::run 20\n");
489 int RenderFarmServerThread::write_string(char *string)
494 len = strlen(string) + 1;
495 datagram = new char[len + 4];
497 memcpy(datagram + i, string, len);
498 write_socket((char*)datagram, len + 4);
499 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
500 // datagram[0], datagram[1], datagram[2], datagram[3]);
507 void RenderFarmServerThread::send_preferences()
512 server->preferences->save_defaults(&defaults);
513 defaults.save_string(string);
514 write_string(string);
519 void RenderFarmServerThread::send_asset()
524 // The asset must be sent in two segments.
525 // One segment is stored in the EDL and contains decoding information.
526 // One segment is stored in the asset and contains encoding information.
527 server->default_asset->save_defaults(&defaults,
534 defaults.save_string(string1);
537 server->default_asset->write(&file, 0, 0);
538 file.terminate_string();
540 write_string(string1);
541 write_string(file.string());
546 void RenderFarmServerThread::send_edl()
551 server->edl->save_xml(&file,
555 file.terminate_string();
556 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
558 write_string(file.string());
559 //printf("RenderFarmServerThread::send_edl 2\n");
563 void RenderFarmServerThread::send_package(unsigned char *buffer)
565 this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
566 (((u_int32_t)buffer[1]) << 16) |
567 (((u_int32_t)buffer[2]) << 8) |
568 ((u_int32_t)buffer[3])) /
571 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
572 RenderPackage *package =
573 server->packages->get_package(frames_per_second,
575 server->use_local_rate);
577 //printf("RenderFarmServerThread::send_package 2\n");
578 datagram = new char[BCTEXTLEN];
583 //printf("RenderFarmServerThread::send_package 1\n");
584 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
585 write_socket(datagram, 4);
590 //printf("RenderFarmServerThread::send_package 10\n");
592 strcpy(&datagram[i], package->path);
593 i += strlen(package->path);
596 STORE_INT32(package->audio_start);
597 STORE_INT32(package->audio_end);
598 STORE_INT32(package->video_start);
599 STORE_INT32(package->video_end);
600 int use_brender = (server->brender ? 1 : 0);
601 STORE_INT32(use_brender);
605 STORE_INT32(len - 4);
607 write_socket(datagram, len);
614 void RenderFarmServerThread::set_progress(unsigned char *buffer)
616 server->total_return_lock->lock("RenderFarmServerThread::set_progress");
617 *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
618 (((u_int32_t)buffer[1]) << 16) |
619 (((u_int32_t)buffer[2]) << 8) |
620 ((u_int32_t)buffer[3]);
621 frames_per_second = (double)((((u_int32_t)buffer[4]) << 24) |
622 (((u_int32_t)buffer[5]) << 16) |
623 (((u_int32_t)buffer[6]) << 8) |
624 ((u_int32_t)buffer[7])) /
626 server->total_return_lock->unlock();
628 server->preferences->set_rate(frames_per_second, number);
630 // This locks the preferences
631 if(server->mwindow) server->mwindow->preferences->copy_rates_from(
632 server->preferences);
635 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
639 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
640 (((u_int32_t)buffer[1]) << 16) |
641 (((u_int32_t)buffer[2]) << 8) |
642 ((u_int32_t)buffer[3]),
643 (int64_t)(((u_int32_t)buffer[4]) << 24) |
644 (((u_int32_t)buffer[5]) << 16) |
645 (((u_int32_t)buffer[6]) << 8) |
646 ((u_int32_t)buffer[7]));
647 char return_value[1];
649 write_socket(return_value, 1);
656 void RenderFarmServerThread::set_result(unsigned char *buffer)
658 //printf("RenderFarmServerThread::set_result %p\n", buffer);
659 if(!*server->result_return)
660 *server->result_return = buffer[0];
664 void RenderFarmServerThread::get_result()
666 unsigned char data[1];
667 data[0] = *server->result_return;
668 write_socket((char*)data, 1);
684 RenderFarmWatchdog::RenderFarmWatchdog(
685 RenderFarmServerThread *server,
686 RenderFarmClientThread *client)
689 this->server = server;
690 this->client = client;
691 next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
692 request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
696 RenderFarmWatchdog::~RenderFarmWatchdog()
699 next_request->unlock();
700 request_complete->unlock();
703 delete request_complete;
706 void RenderFarmWatchdog::begin_request()
708 next_request->unlock();
711 void RenderFarmWatchdog::end_request()
713 request_complete->unlock();
716 void RenderFarmWatchdog::run()
720 next_request->lock("RenderFarmWatchdog::run");
722 int result = request_complete->timed_lock(RENDERFARM_TIMEOUT * 1000000,
723 "RenderFarmWatchdog::run");
724 //printf("RenderFarmWatchdog::run 1 %d\n", result);
730 printf("RenderFarmWatchdog::run 1 killing client pid %d\n", client->pid);
731 kill(client->pid, SIGKILL);
736 printf("RenderFarmWatchdog::run 1 killing server thread %p\n", server);
738 unsigned char buffer[4];
740 server->set_result(buffer);