nested clips, big rework and cleanup, sams new icons, leaks and tweaks
[goodguy/history.git] / cinelerra-5.1 / cinelerra / renderfarm.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "asset.h"
23 #include "bcsignals.h"
24 #include "brender.h"
25 #include "clip.h"
26 #include "condition.h"
27 #include "bchash.h"
28 #include "edl.h"
29 #include "filesystem.h"
30 #include "filexml.h"
31 #include "language.h"
32 #include "mutex.h"
33 #include "mwindow.h"
34 #include "packagedispatcher.h"
35 #include "preferences.h"
36 #include "render.h"
37 #include "renderfarm.h"
38 #include "renderfarmclient.h"
39 #include "bctimer.h"
40 #include "transportque.h"
41
42
43 #include <arpa/inet.h>
44 #include <errno.h>
45 #include <fcntl.h>
46 #include <netdb.h>
47 #include <netinet/in.h>
48 #include <signal.h>
49 #include <stdio.h>
50 #include <string.h>
51 #include <sys/socket.h>
52 #include <sys/types.h>
53 #include <sys/un.h>
54 #include <unistd.h>
55
56
57
58
59 RenderFarmServer::RenderFarmServer(
60         MWindow *mwindow,
61         PackageDispatcher *packages,
62         Preferences *preferences,
63         int use_local_rate,
64         int *result_return,
65         int64_t *total_return,
66         Mutex *total_return_lock,
67         Asset *default_asset,
68         EDL *edl,
69         BRender *brender)
70 {
71         this->mwindow = mwindow;
72         this->packages = packages;
73         this->preferences = preferences;
74         this->use_local_rate = use_local_rate;
75         this->result_return = result_return;
76         this->total_return = total_return;
77         this->total_return_lock = total_return_lock;
78         this->default_asset = default_asset;
79         this->edl = edl;
80         this->brender = brender;
81         client_lock = new Mutex("RenderFarmServer::client_lock");
82 }
83
84 RenderFarmServer::~RenderFarmServer()
85 {
86         clients.remove_all_objects();
87         delete client_lock;
88 }
89
90 // Open connections to clients.
91 int RenderFarmServer::start_clients()
92 {
93         int result = 0;
94
95         for( int i=0; i<preferences->get_enabled_nodes() && !result; ++i ) {
96                 client_lock->lock("RenderFarmServer::start_clients");
97                 RenderFarmServerThread *client = new RenderFarmServerThread(this, i);
98                 clients.append(client);
99
100                 result = client->start_loop();
101                 client_lock->unlock();
102         }
103
104         return result;
105 }
106
107 // The render farm must wait for all the clients to finish.
108 int RenderFarmServer::wait_clients()
109 {
110 //printf("RenderFarmServer::wait_clients 1\n");
111         clients.remove_all_objects();
112 //printf("RenderFarmServer::wait_clients 2\n");
113         return 0;
114 }
115
116 int RenderFarmServer::active_clients()
117 {
118         int n = 0;
119         for( int i=0; i<clients.size(); ++i )
120                 if( clients[i]->running() ) ++n;
121         return n;
122 }
123
124 // Waits for requests from every client.
125 // Joins when the client is finished.
126 RenderFarmServerThread::RenderFarmServerThread(RenderFarmServer *server,
127         int number)
128  : Thread(1, 0, 0)
129 {
130         this->server = server;
131         this->number = number;
132         socket_fd = -1;
133         frames_per_second = 0;
134         watchdog = 0;
135         buffer = 0;
136         datagram = 0;
137         Thread::set_synchronous(1);
138 }
139
140
141
142 RenderFarmServerThread::~RenderFarmServerThread()
143 {
144         Thread::join();
145         if( socket_fd >= 0 ) close(socket_fd);
146         delete watchdog;
147         delete [] buffer;
148         delete [] datagram;
149 }
150
151
152 int RenderFarmServerThread::open_client(const char *hostname, int port)
153 {
154         int socket_fd = -1;
155         int result = 0;
156
157 // Open file for master node
158         if( hostname[0] == '/' ) {
159                 if( (socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 ) {
160                         perror(_("RenderFarmServerThread::open_client: socket\n"));
161                         result = 1;
162                 }
163                 else {
164                         struct sockaddr_un addr;
165                         addr.sun_family = AF_FILE;
166                         strcpy(addr.sun_path, hostname);
167                         int size = (offsetof(struct sockaddr_un, sun_path) +
168                                 strlen(hostname) + 1);
169
170 // The master node is always created by BRender.  Keep trying for 30 seconds.
171
172 #define ATTEMPT_DELAY 100000
173                         int done = 0;
174                         int attempt = 0;
175
176                         do
177                         {
178                                 if( connect(socket_fd, (struct sockaddr*)&addr, size) < 0 ) {
179                                         attempt++;
180                                         if( attempt > 30000000 / ATTEMPT_DELAY ) {
181                                                 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
182                                                         hostname,
183                                                         strerror(errno));
184                                                 result = 1;
185                                         }
186                                         else
187                                                 usleep(ATTEMPT_DELAY);
188                                 }
189                                 else
190                                         done = 1;
191                         }while(!result && !done);
192                 }
193         }
194         else
195 // Open socket
196         {
197                 if( (socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0 ) {
198                         perror(_("RenderFarmServerThread::start_loop: socket"));
199                         result = 1;
200                 }
201                 else {
202 // Open port
203                         struct sockaddr_in addr;
204                         struct hostent *hostinfo;
205                         addr.sin_family = AF_INET;
206                         addr.sin_port = htons(port);
207                         hostinfo = gethostbyname(hostname);
208                         if( hostinfo == NULL ) {
209                         fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
210                                         hostname);
211                         result = 1;
212                 }
213                         else {
214                                 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
215
216                                 if( connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0 ) {
217                                         fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
218                                                 hostname,
219                                                 strerror(errno));
220                                         result = 1;
221                                 }
222                         }
223                 }
224         }
225
226         if( result ) socket_fd = -1;
227
228         return socket_fd;
229 }
230
231 int RenderFarmServerThread::start_loop()
232 {
233         int result = 0;
234         socket_fd = open_client(server->preferences->get_node_hostname(number),
235                 server->preferences->get_node_port(number));
236         if( socket_fd < 0 ) result = 1;
237
238         if( !result ) {
239                 int watchdog_timeout = server->preferences->renderfarm_watchdog_timeout;
240                 if( watchdog_timeout > 0 ) {
241                         watchdog = new RenderFarmWatchdog(watchdog_timeout, this, 0);
242                         watchdog->start();
243                 }
244         }
245
246         if( !result ) Thread::start();
247
248         return result;
249 }
250
251
252
253
254
255
256
257
258
259
260
261 int64_t RenderFarmServerThread::read_int64(int *error)
262 {
263         int temp = 0;
264         if( !error ) error = &temp;
265
266         unsigned char data[sizeof(int64_t)];
267         *error = (read_socket((char*)data, sizeof(int64_t)) !=
268                 sizeof(int64_t));
269
270 // Make it return 1 if error so it can be used to read a result code from the
271 // server.
272         int64_t result = 1;
273         if( !*error ) {
274                 result = (((int64_t)data[0]) << 56) |
275                         (((uint64_t)data[1]) << 48) |
276                         (((uint64_t)data[2]) << 40) |
277                         (((uint64_t)data[3]) << 32) |
278                         (((uint64_t)data[4]) << 24) |
279                         (((uint64_t)data[5]) << 16) |
280                         (((uint64_t)data[6]) << 8)  |
281                         data[7];
282         }
283         return result;
284 }
285
286 int RenderFarmServerThread::write_int64(int64_t value)
287 {
288         unsigned char data[sizeof(int64_t)];
289         data[0] = (value >> 56) & 0xff;
290         data[1] = (value >> 48) & 0xff;
291         data[2] = (value >> 40) & 0xff;
292         data[3] = (value >> 32) & 0xff;
293         data[4] = (value >> 24) & 0xff;
294         data[5] = (value >> 16) & 0xff;
295         data[6] = (value >> 8) & 0xff;
296         data[7] = value & 0xff;
297         return (write_socket((char*)data, sizeof(int64_t)) !=
298                 sizeof(int64_t));
299 }
300
301
302
303 int RenderFarmServerThread::read_socket(char *data, int len)
304 {
305         int bytes_read = 0;
306         int offset = 0;
307 //printf("RenderFarmServerThread::read_socket 1\n");
308         if( watchdog )
309                 watchdog->begin_request();
310         while(len > 0 && bytes_read >= 0)
311         {
312                 enable_cancel();
313                 bytes_read = read(socket_fd, data + offset, len);
314                 disable_cancel();
315                 if( bytes_read > 0 ) {
316                         len -= bytes_read;
317                         offset += bytes_read;
318                 }
319                 else
320                 if( bytes_read < 0 )
321                         break;
322         }
323         if( watchdog )
324                 watchdog->end_request();
325 //printf("RenderFarmServerThread::read_socket 10\n");
326
327         return offset;
328 }
329
330 int RenderFarmServerThread::write_socket(char *data, int len)
331 {
332 //printf("RenderFarmServerThread::write_socket 1\n");
333         int result = write(socket_fd, data, len);
334 //printf("RenderFarmServerThread::write_socket 10\n");
335
336         return result;
337 }
338
339 void RenderFarmServerThread::reallocate_buffer(int size)
340 {
341         if( buffer && buffer_allocated < size ) {
342                 delete [] buffer;
343                 buffer = 0;
344         }
345
346         if( !buffer && size ) {
347                 buffer = new unsigned char[size];
348                 buffer_allocated = size;
349         }
350 }
351
352 void RenderFarmServerThread::run()
353 {
354 // Wait for requests
355         unsigned char header[5];
356         int done = 0;
357         int bytes_read = 0;
358
359
360         buffer = 0;
361         buffer_allocated = 0;
362 //      fs_server = new RenderFarmFSServer(this);
363 //      fs_server->initialize();
364
365
366 // Send command to run package renderer.
367         write_int64(RENDERFARM_PACKAGES);
368
369
370
371         while(!done) {
372
373 // Wait for requests.
374 // Requests consist of request ID's and accompanying buffers.
375 // Get request ID.
376                 bytes_read = read_socket((char*)header, 5);
377 //printf("RenderFarmServerThread::run 1\n");
378                 if( bytes_read != 5 ) {
379                         done = 1;
380                         continue;
381                 }
382
383                 int request_id = header[0];
384                 int64_t request_size = (((u_int32_t)header[1]) << 24) |
385                                         (((u_int32_t)header[2]) << 16) |
386                                         (((u_int32_t)header[3]) << 8)  |
387                                         (u_int32_t)header[4];
388
389                 reallocate_buffer(request_size);
390
391 // Get accompanying buffer
392                 bytes_read = read_socket((char*)buffer, request_size);
393
394 //printf("RenderFarmServerThread::run 2 %d %jd %d\n", request_id, request_size, bytes_read);
395                 if( bytes_read != request_size ) {
396                         done = 1;
397                         continue;
398                 }
399 //printf("RenderFarmServerThread::run 3\n");
400
401                 switch( request_id ) {
402                 case RENDERFARM_PREFERENCES:
403                         send_preferences();
404                         break;
405
406                 case RENDERFARM_ASSET:
407                         send_asset();
408                         break;
409
410                 case RENDERFARM_EDL:
411                         send_edl();
412                         break;
413
414                 case RENDERFARM_PACKAGE:
415                         send_package(buffer);
416                         break;
417
418                 case RENDERFARM_PROGRESS:
419                         set_progress(buffer);
420                         break;
421
422                 case RENDERFARM_SET_RESULT:
423                         set_result(buffer);
424                         break;
425
426                 case RENDERFARM_SET_VMAP:
427                         set_video_map(buffer);
428                         break;
429
430                 case RENDERFARM_GET_RESULT:
431                         get_result();
432                         break;
433
434                 case RENDERFARM_DONE:
435 //printf("RenderFarmServerThread::run 10\n");
436                         done = 1;
437                         break;
438
439                 case RENDERFARM_KEEPALIVE:
440                         break;
441
442                 default:
443 //                      if( fs_server->handle_request(request_id, request_size, (unsigned char*)buffer) ) break;
444                         printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
445                         break;
446                 }
447 //printf("RenderFarmServerThread::run 10 %d %jd\n", request_id, request_size);
448         }
449 //printf("RenderFarmServerThread::run 20\n");
450
451 // Don't let watchdog kill the entire renderfarm when a client finishes normally.
452         delete watchdog;  watchdog = 0;
453 //      delete fs_server;
454 }
455
456 void RenderFarmServerThread::write_string(char *string)
457 {
458         int i, len;
459         i = 0;
460
461         len = strlen(string) + 1;
462         datagram = new char[len + 4];
463         STORE_INT32(len);
464         memcpy(datagram + i, string, len);
465         write_socket((char*)datagram, len + 4);
466 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
467 //      datagram[0], datagram[1], datagram[2], datagram[3]);
468
469         delete [] datagram;
470         datagram = 0;
471 }
472
473 void RenderFarmServerThread::send_preferences()
474 {
475         BC_Hash defaults;
476         char *string;
477
478         server->preferences->save_defaults(&defaults);
479         defaults.save_string(string);
480         write_string(string);
481         free(string);
482 }
483
484 void RenderFarmServerThread::send_asset()
485 {
486         BC_Hash defaults;
487         char *string1;
488
489 // The asset must be sent in two segments.
490 // One segment is stored in the EDL and contains decoding information.
491 // One segment is stored in the asset and contains encoding information.
492         server->default_asset->save_defaults(&defaults,
493                 0,
494                 1,
495                 1,
496                 1,
497                 1,
498                 1);
499         defaults.save_string(string1);
500         FileXML file;
501         server->default_asset->write(&file, 0, 0);
502         file.terminate_string();
503
504         write_string(string1);
505         write_string(file.string());
506         free(string1);
507 }
508
509
510 void RenderFarmServerThread::send_edl()
511 {
512         FileXML file;
513
514 // Save the XML
515         server->edl->save_xml(&file, 0);
516         file.terminate_string();
517 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
518
519         write_string(file.string());
520 //printf("RenderFarmServerThread::send_edl 2\n");
521 }
522
523
524 void RenderFarmServerThread::send_package(unsigned char *buffer)
525 {
526         this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
527                 (((u_int32_t)buffer[1]) << 16) |
528                 (((u_int32_t)buffer[2]) << 8)  |
529                 ((u_int32_t)buffer[3])) /
530                 65536.0;
531
532 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
533         RenderPackage *package =
534                 server->packages->get_package(frames_per_second,
535                         number,
536                         server->use_local_rate);
537
538 //printf("RenderFarmServerThread::send_package 2\n");
539         datagram = new char[BCTEXTLEN];
540
541 // No more packages
542         if( !package ) {
543 //printf("RenderFarmServerThread::send_package 1\n");
544                 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
545                 write_socket(datagram, 4);
546         }
547         else
548 // Encode package
549         {
550 //printf("RenderFarmServerThread::send_package 10\n");
551                 int i = 4;
552                 strcpy(&datagram[i], package->path);
553                 i += strlen(package->path);
554                 datagram[i++] = 0;
555
556                 STORE_INT32(package->audio_start);
557                 STORE_INT32(package->audio_end);
558                 STORE_INT32(package->video_start);
559                 STORE_INT32(package->video_end);
560                 int use_brender = (server->brender ? 1 : 0);
561                 STORE_INT32(use_brender);
562                 STORE_INT32(package->audio_do);
563                 STORE_INT32(package->video_do);
564
565                 int len = i;
566                 i = 0;
567                 STORE_INT32(len - 4);
568
569                 write_socket(datagram, len);
570         }
571         delete [] datagram;
572         datagram = 0;
573 }
574
575
576 void RenderFarmServerThread::set_progress(unsigned char *buffer)
577 {
578         server->total_return_lock->lock("RenderFarmServerThread::set_progress");
579         *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
580                 (((u_int32_t)buffer[1]) << 16) |
581                 (((u_int32_t)buffer[2]) << 8)  |
582                 ((u_int32_t)buffer[3]);
583         frames_per_second = (double)((((u_int32_t)buffer[4]) << 24) |
584                 (((u_int32_t)buffer[5]) << 16) |
585                 (((u_int32_t)buffer[6]) << 8)  |
586                 ((u_int32_t)buffer[7])) /
587                 65536.0;
588         server->total_return_lock->unlock();
589
590         server->preferences->set_rate(frames_per_second, number);
591
592 // This locks the preferences
593         if( server->mwindow ) server->mwindow->preferences->copy_rates_from(
594                 server->preferences);
595 }
596
597 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
598 {
599         if( server->brender ) {
600                 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
601                                                         (((u_int32_t)buffer[1]) << 16) |
602                                                         (((u_int32_t)buffer[2]) << 8)  |
603                                                         ((u_int32_t)buffer[3]),
604                                                         (int64_t)(((u_int32_t)buffer[4]) << 24) |
605                                                         (((u_int32_t)buffer[5]) << 16) |
606                                                         (((u_int32_t)buffer[6]) << 8)  |
607                                                         ((u_int32_t)buffer[7]));
608                 char return_value[1];
609                 return_value[0] = 0;
610                 write_socket(return_value, 1);
611                 return 0;
612         }
613         return 1;
614 }
615
616
617 void RenderFarmServerThread::set_result(unsigned char *buffer)
618 {
619 //printf("RenderFarmServerThread::set_result %p\n", buffer);
620         if( !*server->result_return )
621                 *server->result_return = buffer[0];
622 }
623
624
625 void RenderFarmServerThread::get_result()
626 {
627         unsigned char data[1];
628         data[0] = *server->result_return;
629         write_socket((char*)data, 1);
630 }
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645 RenderFarmWatchdog::RenderFarmWatchdog(int timeout_secs,
646         RenderFarmServerThread *server,
647         RenderFarmClientThread *client)
648  : Thread(1, 0, 0)
649 {
650         this->timeout_usecs = timeout_secs * 1000000;
651         this->server = server;
652         this->client = client;
653         next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
654         request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
655         done = 0;
656 }
657
658 RenderFarmWatchdog::~RenderFarmWatchdog()
659 {
660         done = 1;
661         next_request->unlock();
662         request_complete->unlock();
663         join();
664         delete next_request;
665         delete request_complete;
666 }
667
668 void RenderFarmWatchdog::begin_request()
669 {
670         next_request->unlock();
671 }
672
673 void RenderFarmWatchdog::end_request()
674 {
675         request_complete->unlock();
676 }
677
678 void RenderFarmWatchdog::run()
679 {
680         while(!done) {
681                 next_request->lock("RenderFarmWatchdog::run");
682                 int result = request_complete->timed_lock(timeout_usecs, "RenderFarmWatchdog::run");
683 //printf("RenderFarmWatchdog::run 1 %d\n", result);
684
685                 if( result ) {
686                         if( client ) {
687                                 printf("RenderFarmWatchdog::run 1 killing client pid %d\n", client->pid);
688                                 kill(client->pid, SIGKILL);
689                         }
690                         else if( server ) {
691                                 printf("RenderFarmWatchdog::run 1 killing server thread %p\n", server);
692                                 server->cancel();
693                                 unsigned char buffer[4];
694                                 buffer[0] = 1;
695                                 server->set_result(buffer);
696                         }
697
698                         done = 1;
699                 }
700         }
701 }
702
703
704
705