Credit Andrew - fix vorbis audio which was scratchy and ensure aging plugin does...
[goodguy/cinelerra.git] / cinelerra-5.1 / cinelerra / renderfarm.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  * Copyright (C) 2003-2016 Cinelerra CV contributors
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22
23 #include "asset.h"
24 #include "bcsignals.h"
25 #include "brender.h"
26 #include "clip.h"
27 #include "condition.h"
28 #include "bchash.h"
29 #include "edl.h"
30 #include "filesystem.h"
31 #include "filexml.h"
32 #include "language.h"
33 #include "mutex.h"
34 #include "mwindow.h"
35 #include "packagedispatcher.h"
36 #include "preferences.h"
37 #include "render.h"
38 #include "renderfarm.h"
39 #include "renderfarmclient.h"
40 #include "bctimer.h"
41 #include "transportque.h"
42
43
44 #include <arpa/inet.h>
45 #include <errno.h>
46 #include <fcntl.h>
47 #include <netdb.h>
48 #include <netinet/in.h>
49 #include <signal.h>
50 #include <stdio.h>
51 #include <string.h>
52 #include <sys/socket.h>
53 #include <sys/types.h>
54 #include <sys/un.h>
55 #include <unistd.h>
56
57
58 #ifndef AF_FILE
59 #define AF_FILE AF_LOCAL
60 #endif
61
62
63 RenderFarmServer::RenderFarmServer(
64         MWindow *mwindow,
65         PackageDispatcher *packages,
66         Preferences *preferences,
67         int use_local_rate,
68         int *result_return,
69         int64_t *total_return,
70         Mutex *total_return_lock,
71         Asset *default_asset,
72         EDL *edl,
73         BRender *brender)
74 {
75         this->mwindow = mwindow;
76         this->packages = packages;
77         this->preferences = preferences;
78         this->use_local_rate = use_local_rate;
79         this->result_return = result_return;
80         this->total_return = total_return;
81         this->total_return_lock = total_return_lock;
82         this->default_asset = default_asset;
83         this->edl = edl;
84         this->brender = brender;
85         client_lock = new Mutex("RenderFarmServer::client_lock");
86 }
87
88 RenderFarmServer::~RenderFarmServer()
89 {
90         clients.remove_all_objects();
91         delete client_lock;
92 }
93
94 // Open connections to clients.
95 int RenderFarmServer::start_clients()
96 {
97         int result = 0;
98
99         for( int i=0; i<preferences->get_enabled_nodes() && !result; ++i ) {
100                 client_lock->lock("RenderFarmServer::start_clients");
101                 RenderFarmServerThread *client = new RenderFarmServerThread(this, i);
102                 clients.append(client);
103
104                 result = client->start_loop();
105                 client_lock->unlock();
106         }
107
108         return result;
109 }
110
111 // The render farm must wait for all the clients to finish.
112 int RenderFarmServer::wait_clients()
113 {
114 //printf("RenderFarmServer::wait_clients 1\n");
115         clients.remove_all_objects();
116 //printf("RenderFarmServer::wait_clients 2\n");
117         return 0;
118 }
119
120 int RenderFarmServer::active_clients()
121 {
122         int n = 0;
123         for( int i=0; i<clients.size(); ++i )
124                 if( clients[i]->running() ) ++n;
125         return n;
126 }
127
128 // Waits for requests from every client.
129 // Joins when the client is finished.
130 RenderFarmServerThread::RenderFarmServerThread(RenderFarmServer *server,
131         int number)
132  : Thread(1, 0, 0)
133 {
134         this->server = server;
135         this->number = number;
136         socket_fd = -1;
137         frames_per_second = 0;
138         watchdog = 0;
139         buffer = 0;
140         datagram = 0;
141         Thread::set_synchronous(1);
142 }
143
144
145
146 RenderFarmServerThread::~RenderFarmServerThread()
147 {
148         Thread::join();
149         if( socket_fd >= 0 ) close(socket_fd);
150         delete watchdog;
151         delete [] buffer;
152         delete [] datagram;
153 }
154
155
156 int RenderFarmServerThread::open_client(const char *hostname, int port)
157 {
158         int socket_fd = -1;
159         int result = 0;
160
161 // Open file for master node
162         if( hostname[0] == '/' ) {
163                 if( (socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 ) {
164                         perror(_("RenderFarmServerThread::open_client: socket\n"));
165                         result = 1;
166                 }
167                 else {
168                         struct sockaddr_un addr;
169                         addr.sun_family = AF_FILE;
170                         strcpy(addr.sun_path, hostname);
171                         int size = (offsetof(struct sockaddr_un, sun_path) +
172                                 strlen(hostname) + 1);
173
174 // The master node is always created by BRender.  Keep trying for 30 seconds.
175
176 #define ATTEMPT_DELAY 100000
177                         int done = 0;
178                         int attempt = 0;
179
180                         do
181                         {
182                                 if( connect(socket_fd, (struct sockaddr*)&addr, size) < 0 ) {
183                                         attempt++;
184                                         if( attempt > 30000000 / ATTEMPT_DELAY ) {
185                                                 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
186                                                         hostname,
187                                                         strerror(errno));
188                                                 result = 1;
189                                         }
190                                         else
191                                                 usleep(ATTEMPT_DELAY);
192                                 }
193                                 else
194                                         done = 1;
195                         }while(!result && !done);
196                 }
197         }
198         else
199 // Open socket
200         {
201                 if( (socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0 ) {
202                         perror(_("RenderFarmServerThread::start_loop: socket"));
203                         result = 1;
204                 }
205                 else {
206 // Open port
207                         struct sockaddr_in addr;
208                         struct hostent *hostinfo;
209                         addr.sin_family = AF_INET;
210                         addr.sin_port = htons(port);
211                         hostinfo = gethostbyname(hostname);
212                         if( hostinfo == NULL ) {
213                         fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
214                                         hostname);
215                         result = 1;
216                 }
217                         else {
218                                 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
219
220                                 if( connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0 ) {
221                                         fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
222                                                 hostname,
223                                                 strerror(errno));
224                                         result = 1;
225                                 }
226                         }
227                 }
228         }
229
230         if( result ) socket_fd = -1;
231
232         return socket_fd;
233 }
234
235 int RenderFarmServerThread::start_loop()
236 {
237         int result = 0;
238         socket_fd = open_client(server->preferences->get_node_hostname(number),
239                 server->preferences->get_node_port(number));
240         if( socket_fd < 0 ) result = 1;
241
242         if( !result ) {
243                 int watchdog_timeout = server->preferences->renderfarm_watchdog_timeout;
244                 if( watchdog_timeout > 0 ) {
245                         watchdog = new RenderFarmWatchdog(watchdog_timeout, this, 0);
246                         watchdog->start();
247                 }
248         }
249
250         if( !result ) Thread::start();
251
252         return result;
253 }
254
255
256
257
258
259
260
261
262
263
264
265 int64_t RenderFarmServerThread::read_int64(int *error)
266 {
267         int temp = 0;
268         if( !error ) error = &temp;
269
270         unsigned char data[sizeof(int64_t)];
271         *error = (read_socket((char*)data, sizeof(int64_t)) !=
272                 sizeof(int64_t));
273
274 // Make it return 1 if error so it can be used to read a result code from the
275 // server.
276         int64_t result = 1;
277         if( !*error ) {
278                 result = (((int64_t)data[0]) << 56) |
279                         (((uint64_t)data[1]) << 48) |
280                         (((uint64_t)data[2]) << 40) |
281                         (((uint64_t)data[3]) << 32) |
282                         (((uint64_t)data[4]) << 24) |
283                         (((uint64_t)data[5]) << 16) |
284                         (((uint64_t)data[6]) << 8)  |
285                         data[7];
286         }
287         return result;
288 }
289
290 int RenderFarmServerThread::write_int64(int64_t value)
291 {
292         unsigned char data[sizeof(int64_t)];
293         data[0] = (value >> 56) & 0xff;
294         data[1] = (value >> 48) & 0xff;
295         data[2] = (value >> 40) & 0xff;
296         data[3] = (value >> 32) & 0xff;
297         data[4] = (value >> 24) & 0xff;
298         data[5] = (value >> 16) & 0xff;
299         data[6] = (value >> 8) & 0xff;
300         data[7] = value & 0xff;
301         return (write_socket((char*)data, sizeof(int64_t)) !=
302                 sizeof(int64_t));
303 }
304
305
306
307 int RenderFarmServerThread::read_socket(char *data, int len)
308 {
309         int bytes_read = 0;
310         int offset = 0;
311 //printf("RenderFarmServerThread::read_socket 1\n");
312         if( watchdog )
313                 watchdog->begin_request();
314         while(len > 0 && bytes_read >= 0)
315         {
316                 enable_cancel();
317                 bytes_read = read(socket_fd, data + offset, len);
318                 disable_cancel();
319                 if( bytes_read > 0 ) {
320                         len -= bytes_read;
321                         offset += bytes_read;
322                 }
323                 else
324                 if( bytes_read < 0 )
325                         break;
326         }
327         if( watchdog )
328                 watchdog->end_request();
329 //printf("RenderFarmServerThread::read_socket 10\n");
330
331         return offset;
332 }
333
334 int RenderFarmServerThread::write_socket(char *data, int len)
335 {
336 //printf("RenderFarmServerThread::write_socket 1\n");
337         int result = write(socket_fd, data, len);
338 //printf("RenderFarmServerThread::write_socket 10\n");
339
340         return result;
341 }
342
343 void RenderFarmServerThread::reallocate_buffer(int size)
344 {
345         if( buffer && buffer_allocated < size ) {
346                 delete [] buffer;
347                 buffer = 0;
348         }
349
350         if( !buffer && size ) {
351                 buffer = new unsigned char[size];
352                 buffer_allocated = size;
353         }
354 }
355
356 void RenderFarmServerThread::run()
357 {
358 // Wait for requests
359         unsigned char header[5];
360         int done = 0;
361         int bytes_read = 0;
362
363
364         buffer = 0;
365         buffer_allocated = 0;
366 //      fs_server = new RenderFarmFSServer(this);
367 //      fs_server->initialize();
368
369
370 // Send command to run package renderer.
371         write_int64(RENDERFARM_PACKAGES);
372
373
374
375         while(!done) {
376
377 // Wait for requests.
378 // Requests consist of request ID's and accompanying buffers.
379 // Get request ID.
380                 bytes_read = read_socket((char*)header, 5);
381 //printf("RenderFarmServerThread::run 1\n");
382                 if( bytes_read != 5 ) {
383                         done = 1;
384                         continue;
385                 }
386
387                 int request_id = header[0];
388                 int64_t request_size = (((u_int32_t)header[1]) << 24) |
389                                         (((u_int32_t)header[2]) << 16) |
390                                         (((u_int32_t)header[3]) << 8)  |
391                                         (u_int32_t)header[4];
392
393                 reallocate_buffer(request_size);
394
395 // Get accompanying buffer
396                 bytes_read = read_socket((char*)buffer, request_size);
397
398 //printf("RenderFarmServerThread::run 2 %d %jd %d\n", request_id, request_size, bytes_read);
399                 if( bytes_read != request_size ) {
400                         done = 1;
401                         continue;
402                 }
403 //printf("RenderFarmServerThread::run 3\n");
404
405                 switch( request_id ) {
406                 case RENDERFARM_PREFERENCES:
407                         send_preferences();
408                         break;
409
410                 case RENDERFARM_ASSET:
411                         send_asset();
412                         break;
413
414                 case RENDERFARM_EDL:
415                         send_edl();
416                         break;
417
418                 case RENDERFARM_PACKAGE:
419                         send_package(buffer);
420                         break;
421
422                 case RENDERFARM_PROGRESS:
423                         set_progress(buffer);
424                         break;
425
426                 case RENDERFARM_SET_RESULT:
427                         set_result(buffer);
428                         break;
429
430                 case RENDERFARM_SET_VMAP:
431                         set_video_map(buffer);
432                         break;
433
434                 case RENDERFARM_GET_RESULT:
435                         get_result();
436                         break;
437
438                 case RENDERFARM_DONE:
439 //printf("RenderFarmServerThread::run 10\n");
440                         done = 1;
441                         break;
442
443                 case RENDERFARM_KEEPALIVE:
444                         break;
445
446                 default:
447 //                      if( fs_server->handle_request(request_id, request_size, (unsigned char*)buffer) ) break;
448                         printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
449                         break;
450                 }
451 //printf("RenderFarmServerThread::run 10 %d %jd\n", request_id, request_size);
452         }
453 //printf("RenderFarmServerThread::run 20\n");
454
455 // Don't let watchdog kill the entire renderfarm when a client finishes normally.
456         delete watchdog;  watchdog = 0;
457 //      delete fs_server;
458 }
459
460 void RenderFarmServerThread::write_string(char *string)
461 {
462         int i, len;
463         i = 0;
464
465         len = strlen(string) + 1;
466         datagram = new char[len + 4];
467         STORE_INT32(len);
468         memcpy(datagram + i, string, len);
469         write_socket((char*)datagram, len + 4);
470 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
471 //      datagram[0], datagram[1], datagram[2], datagram[3]);
472
473         delete [] datagram;
474         datagram = 0;
475 }
476
477 void RenderFarmServerThread::send_preferences()
478 {
479         BC_Hash defaults;
480         char *string;
481
482         server->preferences->save_defaults(&defaults);
483         defaults.save_string(string);
484         write_string(string);
485         free(string);
486 }
487
488 void RenderFarmServerThread::send_asset()
489 {
490         BC_Hash defaults;
491         char *string1;
492
493 // The asset must be sent in two segments.
494 // One segment is stored in the EDL and contains decoding information.
495 // One segment is stored in the asset and contains encoding information.
496         server->default_asset->save_defaults(&defaults, 0, 1, 1, 1, 1, 1);
497         defaults.save_string(string1);
498         FileXML file;
499         server->default_asset->write(&file, 0, 0);
500         file.terminate_string();
501
502         write_string(string1);
503         write_string(file.string());
504         free(string1);
505 }
506
507
508 void RenderFarmServerThread::send_edl()
509 {
510         FileXML file;
511
512 // Save the XML
513         server->edl->save_xml(&file, 0);
514         file.terminate_string();
515 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
516
517         write_string(file.string());
518 //printf("RenderFarmServerThread::send_edl 2\n");
519 }
520
521
522 void RenderFarmServerThread::send_package(unsigned char *buffer)
523 {
524         this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
525                 (((u_int32_t)buffer[1]) << 16) |
526                 (((u_int32_t)buffer[2]) << 8)  |
527                 ((u_int32_t)buffer[3])) /
528                 65536.0;
529
530 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
531         RenderPackage *package =
532                 server->packages->get_package(frames_per_second,
533                         number,
534                         server->use_local_rate);
535
536 //printf("RenderFarmServerThread::send_package 2\n");
537         datagram = new char[BCTEXTLEN];
538
539 // No more packages
540         if( !package ) {
541 //printf("RenderFarmServerThread::send_package 1\n");
542                 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
543                 write_socket(datagram, 4);
544         }
545         else
546 // Encode package
547         {
548 //printf("RenderFarmServerThread::send_package 10\n");
549                 int i = 4;
550                 strcpy(&datagram[i], package->path);
551                 i += strlen(package->path);
552                 datagram[i++] = 0;
553
554                 STORE_INT32(package->audio_start);
555                 STORE_INT32(package->audio_end);
556                 STORE_INT32(package->video_start);
557                 STORE_INT32(package->video_end);
558                 int use_brender = (server->brender ? 1 : 0);
559                 STORE_INT32(use_brender);
560                 STORE_INT32(package->audio_do);
561                 STORE_INT32(package->video_do);
562
563                 int len = i;
564                 i = 0;
565                 STORE_INT32(len - 4);
566
567                 write_socket(datagram, len);
568         }
569         delete [] datagram;
570         datagram = 0;
571 }
572
573
574 void RenderFarmServerThread::set_progress(unsigned char *buffer)
575 {
576         server->total_return_lock->lock("RenderFarmServerThread::set_progress");
577         *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
578                 (((u_int32_t)buffer[1]) << 16) |
579                 (((u_int32_t)buffer[2]) << 8)  |
580                 ((u_int32_t)buffer[3]);
581         frames_per_second = (double)((((u_int32_t)buffer[4]) << 24) |
582                 (((u_int32_t)buffer[5]) << 16) |
583                 (((u_int32_t)buffer[6]) << 8)  |
584                 ((u_int32_t)buffer[7])) /
585                 65536.0;
586         server->total_return_lock->unlock();
587
588         server->preferences->set_rate(frames_per_second, number);
589
590 // This locks the preferences
591         if( server->mwindow ) server->mwindow->preferences->copy_rates_from(
592                 server->preferences);
593 }
594
595 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
596 {
597         if( server->brender ) {
598                 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
599                                                         (((u_int32_t)buffer[1]) << 16) |
600                                                         (((u_int32_t)buffer[2]) << 8)  |
601                                                         ((u_int32_t)buffer[3]),
602                                                         (int64_t)(((u_int32_t)buffer[4]) << 24) |
603                                                         (((u_int32_t)buffer[5]) << 16) |
604                                                         (((u_int32_t)buffer[6]) << 8)  |
605                                                         ((u_int32_t)buffer[7]));
606                 char return_value[1];
607                 return_value[0] = 0;
608                 write_socket(return_value, 1);
609                 return 0;
610         }
611         return 1;
612 }
613
614
615 void RenderFarmServerThread::set_result(unsigned char *buffer)
616 {
617 //printf("RenderFarmServerThread::set_result %p\n", buffer);
618         if( !*server->result_return )
619                 *server->result_return = buffer[0];
620 }
621
622
623 void RenderFarmServerThread::get_result()
624 {
625         unsigned char data[1];
626         data[0] = *server->result_return;
627         write_socket((char*)data, 1);
628 }
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643 RenderFarmWatchdog::RenderFarmWatchdog(int timeout_secs,
644         RenderFarmServerThread *server,
645         RenderFarmClientThread *client)
646  : Thread(1, 0, 0)
647 {
648         this->timeout_usecs = timeout_secs * 1000000;
649         this->server = server;
650         this->client = client;
651         next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
652         request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
653         done = 0;
654 }
655
656 RenderFarmWatchdog::~RenderFarmWatchdog()
657 {
658         done = 1;
659         next_request->unlock();
660         request_complete->unlock();
661         join();
662         delete next_request;
663         delete request_complete;
664 }
665
666 void RenderFarmWatchdog::begin_request()
667 {
668         next_request->unlock();
669 }
670
671 void RenderFarmWatchdog::end_request()
672 {
673         request_complete->unlock();
674 }
675
676 void RenderFarmWatchdog::run()
677 {
678         while(!done) {
679                 next_request->lock("RenderFarmWatchdog::run");
680                 int result = request_complete->timed_lock(timeout_usecs, "RenderFarmWatchdog::run");
681 //printf("RenderFarmWatchdog::run 1 %d\n", result);
682
683                 if( result ) {
684                         if( client ) {
685                                 printf("RenderFarmWatchdog::run 1 killing client pid %d\n", client->pid);
686                                 kill(client->pid, SIGKILL);
687                         }
688                         else if( server ) {
689                                 printf("RenderFarmWatchdog::run 1 killing server thread %p\n", server);
690                                 server->cancel();
691                                 unsigned char buffer[4];
692                                 buffer[0] = 1;
693                                 server->set_result(buffer);
694                         }
695
696                         done = 1;
697                 }
698         }
699 }
700
701
702
703