Credit Andrew - improve in-tree documentation
[goodguy/cinelerra.git] / cinelerra / renderfarm.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "asset.h"
23 #include "bcsignals.h"
24 #include "brender.h"
25 #include "clip.h"
26 #include "condition.h"
27 #include "bchash.h"
28 #include "edl.h"
29 #include "filesystem.h"
30 #include "filexml.h"
31 #include "language.h"
32 #include "mutex.h"
33 #include "mwindow.h"
34 #include "packagedispatcher.h"
35 #include "preferences.h"
36 #include "render.h"
37 #include "renderfarm.h"
38 #include "renderfarmclient.h"
39 #include "bctimer.h"
40 #include "transportque.h"
41
42
43 #include <arpa/inet.h>
44 #include <errno.h>
45 #include <fcntl.h>
46 #include <netdb.h>
47 #include <netinet/in.h>
48 #include <signal.h>
49 #include <stdio.h>
50 #include <string.h>
51 #include <sys/socket.h>
52 #include <sys/types.h>
53 #include <sys/un.h>
54 #include <unistd.h>
55
56
57 #ifndef AF_FILE
58 #define AF_FILE AF_LOCAL
59 #endif
60
61
62 RenderFarmServer::RenderFarmServer(
63         MWindow *mwindow,
64         PackageDispatcher *packages,
65         Preferences *preferences,
66         int use_local_rate,
67         int *result_return,
68         int64_t *total_return,
69         Mutex *total_return_lock,
70         Asset *default_asset,
71         EDL *edl,
72         BRender *brender)
73 {
74         this->mwindow = mwindow;
75         this->packages = packages;
76         this->preferences = preferences;
77         this->use_local_rate = use_local_rate;
78         this->result_return = result_return;
79         this->total_return = total_return;
80         this->total_return_lock = total_return_lock;
81         this->default_asset = default_asset;
82         this->edl = edl;
83         this->brender = brender;
84         client_lock = new Mutex("RenderFarmServer::client_lock");
85 }
86
87 RenderFarmServer::~RenderFarmServer()
88 {
89         clients.remove_all_objects();
90         delete client_lock;
91 }
92
93 // Open connections to clients.
94 int RenderFarmServer::start_clients()
95 {
96         int result = 0;
97
98         for( int i=0; i<preferences->get_enabled_nodes() && !result; ++i ) {
99                 client_lock->lock("RenderFarmServer::start_clients");
100                 RenderFarmServerThread *client = new RenderFarmServerThread(this, i);
101                 clients.append(client);
102
103                 result = client->start_loop();
104                 client_lock->unlock();
105         }
106
107         return result;
108 }
109
110 // The render farm must wait for all the clients to finish.
111 int RenderFarmServer::wait_clients()
112 {
113 //printf("RenderFarmServer::wait_clients 1\n");
114         clients.remove_all_objects();
115 //printf("RenderFarmServer::wait_clients 2\n");
116         return 0;
117 }
118
119 int RenderFarmServer::active_clients()
120 {
121         int n = 0;
122         for( int i=0; i<clients.size(); ++i )
123                 if( clients[i]->running() ) ++n;
124         return n;
125 }
126
127 // Waits for requests from every client.
128 // Joins when the client is finished.
129 RenderFarmServerThread::RenderFarmServerThread(RenderFarmServer *server,
130         int number)
131  : Thread(1, 0, 0)
132 {
133         this->server = server;
134         this->number = number;
135         socket_fd = -1;
136         frames_per_second = 0;
137         watchdog = 0;
138         buffer = 0;
139         datagram = 0;
140         Thread::set_synchronous(1);
141 }
142
143
144
145 RenderFarmServerThread::~RenderFarmServerThread()
146 {
147         Thread::join();
148         if( socket_fd >= 0 ) close(socket_fd);
149         delete watchdog;
150         delete [] buffer;
151         delete [] datagram;
152 }
153
154
155 int RenderFarmServerThread::open_client(const char *hostname, int port)
156 {
157         int socket_fd = -1;
158         int result = 0;
159
160 // Open file for master node
161         if( hostname[0] == '/' ) {
162                 if( (socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 ) {
163                         perror(_("RenderFarmServerThread::open_client: socket\n"));
164                         result = 1;
165                 }
166                 else {
167                         struct sockaddr_un addr;
168                         addr.sun_family = AF_FILE;
169                         strcpy(addr.sun_path, hostname);
170                         int size = (offsetof(struct sockaddr_un, sun_path) +
171                                 strlen(hostname) + 1);
172
173 // The master node is always created by BRender.  Keep trying for 30 seconds.
174
175 #define ATTEMPT_DELAY 100000
176                         int done = 0;
177                         int attempt = 0;
178
179                         do
180                         {
181                                 if( connect(socket_fd, (struct sockaddr*)&addr, size) < 0 ) {
182                                         attempt++;
183                                         if( attempt > 30000000 / ATTEMPT_DELAY ) {
184                                                 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
185                                                         hostname,
186                                                         strerror(errno));
187                                                 result = 1;
188                                         }
189                                         else
190                                                 usleep(ATTEMPT_DELAY);
191                                 }
192                                 else
193                                         done = 1;
194                         }while(!result && !done);
195                 }
196         }
197         else
198 // Open socket
199         {
200                 if( (socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0 ) {
201                         perror(_("RenderFarmServerThread::start_loop: socket"));
202                         result = 1;
203                 }
204                 else {
205 // Open port
206                         struct sockaddr_in addr;
207                         struct hostent *hostinfo;
208                         addr.sin_family = AF_INET;
209                         addr.sin_port = htons(port);
210                         hostinfo = gethostbyname(hostname);
211                         if( hostinfo == NULL ) {
212                         fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
213                                         hostname);
214                         result = 1;
215                 }
216                         else {
217                                 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
218
219                                 if( connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0 ) {
220                                         fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
221                                                 hostname,
222                                                 strerror(errno));
223                                         result = 1;
224                                 }
225                         }
226                 }
227         }
228
229         if( result ) socket_fd = -1;
230
231         return socket_fd;
232 }
233
234 int RenderFarmServerThread::start_loop()
235 {
236         int result = 0;
237         socket_fd = open_client(server->preferences->get_node_hostname(number),
238                 server->preferences->get_node_port(number));
239         if( socket_fd < 0 ) result = 1;
240
241         if( !result ) {
242                 int watchdog_timeout = server->preferences->renderfarm_watchdog_timeout;
243                 if( watchdog_timeout > 0 ) {
244                         watchdog = new RenderFarmWatchdog(watchdog_timeout, this, 0);
245                         watchdog->start();
246                 }
247         }
248
249         if( !result ) Thread::start();
250
251         return result;
252 }
253
254
255
256
257
258
259
260
261
262
263
264 int64_t RenderFarmServerThread::read_int64(int *error)
265 {
266         int temp = 0;
267         if( !error ) error = &temp;
268
269         unsigned char data[sizeof(int64_t)];
270         *error = (read_socket((char*)data, sizeof(int64_t)) !=
271                 sizeof(int64_t));
272
273 // Make it return 1 if error so it can be used to read a result code from the
274 // server.
275         int64_t result = 1;
276         if( !*error ) {
277                 result = (((int64_t)data[0]) << 56) |
278                         (((uint64_t)data[1]) << 48) |
279                         (((uint64_t)data[2]) << 40) |
280                         (((uint64_t)data[3]) << 32) |
281                         (((uint64_t)data[4]) << 24) |
282                         (((uint64_t)data[5]) << 16) |
283                         (((uint64_t)data[6]) << 8)  |
284                         data[7];
285         }
286         return result;
287 }
288
289 int RenderFarmServerThread::write_int64(int64_t value)
290 {
291         unsigned char data[sizeof(int64_t)];
292         data[0] = (value >> 56) & 0xff;
293         data[1] = (value >> 48) & 0xff;
294         data[2] = (value >> 40) & 0xff;
295         data[3] = (value >> 32) & 0xff;
296         data[4] = (value >> 24) & 0xff;
297         data[5] = (value >> 16) & 0xff;
298         data[6] = (value >> 8) & 0xff;
299         data[7] = value & 0xff;
300         return (write_socket((char*)data, sizeof(int64_t)) !=
301                 sizeof(int64_t));
302 }
303
304
305
306 int RenderFarmServerThread::read_socket(char *data, int len)
307 {
308         int bytes_read = 0;
309         int offset = 0;
310 //printf("RenderFarmServerThread::read_socket 1\n");
311         if( watchdog )
312                 watchdog->begin_request();
313         while(len > 0 && bytes_read >= 0)
314         {
315                 enable_cancel();
316                 bytes_read = read(socket_fd, data + offset, len);
317                 disable_cancel();
318                 if( bytes_read > 0 ) {
319                         len -= bytes_read;
320                         offset += bytes_read;
321                 }
322                 else
323                 if( bytes_read < 0 )
324                         break;
325         }
326         if( watchdog )
327                 watchdog->end_request();
328 //printf("RenderFarmServerThread::read_socket 10\n");
329
330         return offset;
331 }
332
333 int RenderFarmServerThread::write_socket(char *data, int len)
334 {
335 //printf("RenderFarmServerThread::write_socket 1\n");
336         int result = write(socket_fd, data, len);
337 //printf("RenderFarmServerThread::write_socket 10\n");
338
339         return result;
340 }
341
342 void RenderFarmServerThread::reallocate_buffer(int size)
343 {
344         if( buffer && buffer_allocated < size ) {
345                 delete [] buffer;
346                 buffer = 0;
347         }
348
349         if( !buffer && size ) {
350                 buffer = new unsigned char[size];
351                 buffer_allocated = size;
352         }
353 }
354
355 void RenderFarmServerThread::run()
356 {
357 // Wait for requests
358         unsigned char header[5];
359         int done = 0;
360         int bytes_read = 0;
361
362
363         buffer = 0;
364         buffer_allocated = 0;
365 //      fs_server = new RenderFarmFSServer(this);
366 //      fs_server->initialize();
367
368
369 // Send command to run package renderer.
370         write_int64(RENDERFARM_PACKAGES);
371
372
373
374         while(!done) {
375
376 // Wait for requests.
377 // Requests consist of request ID's and accompanying buffers.
378 // Get request ID.
379                 bytes_read = read_socket((char*)header, 5);
380 //printf("RenderFarmServerThread::run 1\n");
381                 if( bytes_read != 5 ) {
382                         done = 1;
383                         continue;
384                 }
385
386                 int request_id = header[0];
387                 int64_t request_size = (((u_int32_t)header[1]) << 24) |
388                                         (((u_int32_t)header[2]) << 16) |
389                                         (((u_int32_t)header[3]) << 8)  |
390                                         (u_int32_t)header[4];
391
392                 reallocate_buffer(request_size);
393
394 // Get accompanying buffer
395                 bytes_read = read_socket((char*)buffer, request_size);
396
397 //printf("RenderFarmServerThread::run 2 %d %jd %d\n", request_id, request_size, bytes_read);
398                 if( bytes_read != request_size ) {
399                         done = 1;
400                         continue;
401                 }
402 //printf("RenderFarmServerThread::run 3\n");
403
404                 switch( request_id ) {
405                 case RENDERFARM_PREFERENCES:
406                         send_preferences();
407                         break;
408
409                 case RENDERFARM_ASSET:
410                         send_asset();
411                         break;
412
413                 case RENDERFARM_EDL:
414                         send_edl();
415                         break;
416
417                 case RENDERFARM_PACKAGE:
418                         send_package(buffer);
419                         break;
420
421                 case RENDERFARM_PROGRESS:
422                         set_progress(buffer);
423                         break;
424
425                 case RENDERFARM_SET_RESULT:
426                         set_result(buffer);
427                         break;
428
429                 case RENDERFARM_SET_VMAP:
430                         set_video_map(buffer);
431                         break;
432
433                 case RENDERFARM_GET_RESULT:
434                         get_result();
435                         break;
436
437                 case RENDERFARM_DONE:
438 //printf("RenderFarmServerThread::run 10\n");
439                         done = 1;
440                         break;
441
442                 case RENDERFARM_KEEPALIVE:
443                         break;
444
445                 default:
446 //                      if( fs_server->handle_request(request_id, request_size, (unsigned char*)buffer) ) break;
447                         printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
448                         break;
449                 }
450 //printf("RenderFarmServerThread::run 10 %d %jd\n", request_id, request_size);
451         }
452 //printf("RenderFarmServerThread::run 20\n");
453
454 // Don't let watchdog kill the entire renderfarm when a client finishes normally.
455         delete watchdog;  watchdog = 0;
456 //      delete fs_server;
457 }
458
459 void RenderFarmServerThread::write_string(char *string)
460 {
461         int i, len;
462         i = 0;
463
464         len = strlen(string) + 1;
465         datagram = new char[len + 4];
466         STORE_INT32(len);
467         memcpy(datagram + i, string, len);
468         write_socket((char*)datagram, len + 4);
469 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
470 //      datagram[0], datagram[1], datagram[2], datagram[3]);
471
472         delete [] datagram;
473         datagram = 0;
474 }
475
476 void RenderFarmServerThread::send_preferences()
477 {
478         BC_Hash defaults;
479         char *string;
480
481         server->preferences->save_defaults(&defaults);
482         defaults.save_string(string);
483         write_string(string);
484         free(string);
485 }
486
487 void RenderFarmServerThread::send_asset()
488 {
489         BC_Hash defaults;
490         char *string1;
491
492 // The asset must be sent in two segments.
493 // One segment is stored in the EDL and contains decoding information.
494 // One segment is stored in the asset and contains encoding information.
495         server->default_asset->save_defaults(&defaults,
496                 0,
497                 1,
498                 1,
499                 1,
500                 1,
501                 1);
502         defaults.save_string(string1);
503         FileXML file;
504         server->default_asset->write(&file, 0, 0);
505         file.terminate_string();
506
507         write_string(string1);
508         write_string(file.string());
509         free(string1);
510 }
511
512
513 void RenderFarmServerThread::send_edl()
514 {
515         FileXML file;
516
517 // Save the XML
518         server->edl->save_xml(&file, 0);
519         file.terminate_string();
520 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
521
522         write_string(file.string());
523 //printf("RenderFarmServerThread::send_edl 2\n");
524 }
525
526
527 void RenderFarmServerThread::send_package(unsigned char *buffer)
528 {
529         this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
530                 (((u_int32_t)buffer[1]) << 16) |
531                 (((u_int32_t)buffer[2]) << 8)  |
532                 ((u_int32_t)buffer[3])) /
533                 65536.0;
534
535 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
536         RenderPackage *package =
537                 server->packages->get_package(frames_per_second,
538                         number,
539                         server->use_local_rate);
540
541 //printf("RenderFarmServerThread::send_package 2\n");
542         datagram = new char[BCTEXTLEN];
543
544 // No more packages
545         if( !package ) {
546 //printf("RenderFarmServerThread::send_package 1\n");
547                 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
548                 write_socket(datagram, 4);
549         }
550         else
551 // Encode package
552         {
553 //printf("RenderFarmServerThread::send_package 10\n");
554                 int i = 4;
555                 strcpy(&datagram[i], package->path);
556                 i += strlen(package->path);
557                 datagram[i++] = 0;
558
559                 STORE_INT32(package->audio_start);
560                 STORE_INT32(package->audio_end);
561                 STORE_INT32(package->video_start);
562                 STORE_INT32(package->video_end);
563                 int use_brender = (server->brender ? 1 : 0);
564                 STORE_INT32(use_brender);
565                 STORE_INT32(package->audio_do);
566                 STORE_INT32(package->video_do);
567
568                 int len = i;
569                 i = 0;
570                 STORE_INT32(len - 4);
571
572                 write_socket(datagram, len);
573         }
574         delete [] datagram;
575         datagram = 0;
576 }
577
578
579 void RenderFarmServerThread::set_progress(unsigned char *buffer)
580 {
581         server->total_return_lock->lock("RenderFarmServerThread::set_progress");
582         *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
583                 (((u_int32_t)buffer[1]) << 16) |
584                 (((u_int32_t)buffer[2]) << 8)  |
585                 ((u_int32_t)buffer[3]);
586         frames_per_second = (double)((((u_int32_t)buffer[4]) << 24) |
587                 (((u_int32_t)buffer[5]) << 16) |
588                 (((u_int32_t)buffer[6]) << 8)  |
589                 ((u_int32_t)buffer[7])) /
590                 65536.0;
591         server->total_return_lock->unlock();
592
593         server->preferences->set_rate(frames_per_second, number);
594
595 // This locks the preferences
596         if( server->mwindow ) server->mwindow->preferences->copy_rates_from(
597                 server->preferences);
598 }
599
600 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
601 {
602         if( server->brender ) {
603                 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
604                                                         (((u_int32_t)buffer[1]) << 16) |
605                                                         (((u_int32_t)buffer[2]) << 8)  |
606                                                         ((u_int32_t)buffer[3]),
607                                                         (int64_t)(((u_int32_t)buffer[4]) << 24) |
608                                                         (((u_int32_t)buffer[5]) << 16) |
609                                                         (((u_int32_t)buffer[6]) << 8)  |
610                                                         ((u_int32_t)buffer[7]));
611                 char return_value[1];
612                 return_value[0] = 0;
613                 write_socket(return_value, 1);
614                 return 0;
615         }
616         return 1;
617 }
618
619
620 void RenderFarmServerThread::set_result(unsigned char *buffer)
621 {
622 //printf("RenderFarmServerThread::set_result %p\n", buffer);
623         if( !*server->result_return )
624                 *server->result_return = buffer[0];
625 }
626
627
628 void RenderFarmServerThread::get_result()
629 {
630         unsigned char data[1];
631         data[0] = *server->result_return;
632         write_socket((char*)data, 1);
633 }
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648 RenderFarmWatchdog::RenderFarmWatchdog(int timeout_secs,
649         RenderFarmServerThread *server,
650         RenderFarmClientThread *client)
651  : Thread(1, 0, 0)
652 {
653         this->timeout_usecs = timeout_secs * 1000000;
654         this->server = server;
655         this->client = client;
656         next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
657         request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
658         done = 0;
659 }
660
661 RenderFarmWatchdog::~RenderFarmWatchdog()
662 {
663         done = 1;
664         next_request->unlock();
665         request_complete->unlock();
666         join();
667         delete next_request;
668         delete request_complete;
669 }
670
671 void RenderFarmWatchdog::begin_request()
672 {
673         next_request->unlock();
674 }
675
676 void RenderFarmWatchdog::end_request()
677 {
678         request_complete->unlock();
679 }
680
681 void RenderFarmWatchdog::run()
682 {
683         while(!done) {
684                 next_request->lock("RenderFarmWatchdog::run");
685                 int result = request_complete->timed_lock(timeout_usecs, "RenderFarmWatchdog::run");
686 //printf("RenderFarmWatchdog::run 1 %d\n", result);
687
688                 if( result ) {
689                         if( client ) {
690                                 printf("RenderFarmWatchdog::run 1 killing client pid %d\n", client->pid);
691                                 kill(client->pid, SIGKILL);
692                         }
693                         else if( server ) {
694                                 printf("RenderFarmWatchdog::run 1 killing server thread %p\n", server);
695                                 server->cancel();
696                                 unsigned char buffer[4];
697                                 buffer[0] = 1;
698                                 server->set_result(buffer);
699                         }
700
701                         done = 1;
702                 }
703         }
704 }
705
706
707
708