Merge CV, ver=5.1; ops/methods from HV, and interface from CV where possible
[goodguy/history.git] / cinelerra-5.1 / cinelerra / renderfarmclient.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "asset.h"
23 #include "assets.h"
24 #include "clip.h"
25 #include "bchash.h"
26 #include "dvbtune.h"
27 #include "edl.h"
28 #include "filesystem.h"
29 #include "filexml.h"
30 #include "language.h"
31 #include "mutex.h"
32 #include "mwindow.h"
33 #include "pluginserver.h"
34 #include "preferences.h"
35 #include "renderfarm.h"
36 #include "renderfarmclient.h"
37 //#include "renderfarmfsclient.h"
38 #include "sighandler.h"
39
40 #include <arpa/inet.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <netdb.h>
44 #include <netinet/in.h>
45 #include <stdio.h>
46 #include <string.h>
47 #include <sys/socket.h>
48 #include <sys/types.h>
49 #include <sys/un.h>
50 #include <sys/wait.h>
51 #include <unistd.h>
52
53
54
55
56 // The render client waits for connections from the server.
57 // Then it starts a thread for each connection.
58 RenderFarmClient::RenderFarmClient(int port,
59         char *deamon_path,
60         int nice_value,
61         char *config_path)
62 {
63         char string[BCTEXTLEN];
64
65         this->port = port;
66         this->deamon_path = deamon_path;
67         SigHandler *signals = new SigHandler;
68         signals->initialize();
69
70         this_pid = getpid();
71         (void)nice(nice_value);
72
73
74         MWindow::init_defaults(boot_defaults, config_path);
75         boot_preferences = new Preferences;
76         boot_preferences->load_defaults(boot_defaults);
77         MWindow::init_plugins(0, boot_preferences);
78
79         strcpy(string, boot_preferences->plugin_dir);
80         strcat(string, "/" FONT_SEARCHPATH);
81         BC_Resources::init_fontconfig(string);
82 }
83
84
85
86
87 RenderFarmClient::~RenderFarmClient()
88 {
89 //      delete thread;
90         delete boot_defaults;
91         delete boot_preferences;
92         plugindb->remove_all_objects();
93         delete plugindb;
94 }
95
96
97 void RenderFarmClient::main_loop()
98 {
99         int socket_fd;
100         BC_WindowBase::get_resources()->vframe_shm = 1;
101
102
103 // Open listening port
104
105         if(!deamon_path)
106         {
107                 struct sockaddr_in addr;
108
109                 addr.sin_family = AF_INET;
110                 addr.sin_port = htons((unsigned short)port);
111                 addr.sin_addr.s_addr = htonl(INADDR_ANY);
112
113                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
114                 {
115                         perror(_("RenderFarmClient::main_loop: socket"));
116                         return;
117                 }
118
119                 if(bind(socket_fd,
120                         (struct sockaddr*)&addr,
121                         sizeof(addr)) < 0)
122                 {
123                         fprintf(stderr,
124                                 _("RenderFarmClient::main_loop: bind port %d: %s"),
125                                 port,
126                                 strerror(errno));
127                         return;
128                 }
129         }
130         else
131         {
132                 struct sockaddr_un addr;
133                 addr.sun_family = AF_FILE;
134                 strcpy(addr.sun_path, deamon_path);
135                 int size = (offsetof(struct sockaddr_un, sun_path) +
136                         strlen(deamon_path) + 1);
137
138                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
139                 {
140                         perror(_("RenderFarmClient::main_loop: socket"));
141                         return;
142                 }
143
144                 if(bind(socket_fd,
145                         (struct sockaddr*)&addr,
146                         size) < 0)
147                 {
148                         fprintf(stderr,
149                                 _("RenderFarmClient::main_loop: bind path %s: %s\n"),
150                                 deamon_path,
151                                 strerror(errno));
152                         return;
153                 }
154         }
155
156 // Wait for connections
157         printf("RenderFarmClient::main_loop: client started\n");
158         while(1)
159         {
160                 if(listen(socket_fd, 256) < 0)
161         {
162                 perror(_("RenderFarmClient::main_loop: listen"));
163                 return;
164         }
165
166                 int new_socket_fd;
167
168
169
170                 if(!deamon_path)
171                 {
172                         struct sockaddr_in clientname;
173                         socklen_t size = sizeof(clientname);
174                 if((new_socket_fd = accept(socket_fd,
175                                 (struct sockaddr*)&clientname,
176                                                 &size)) < 0)
177                 {
178                         perror(_("RenderFarmClient::main_loop: accept"));
179                         return;
180                 }
181                         else
182                         {
183 printf("RenderFarmClient::main_loop: Session started from %s\n", inet_ntoa(clientname.sin_addr));
184                                 RenderFarmClientThread *thread =
185                                         new RenderFarmClientThread(this);
186                                 thread->main_loop(new_socket_fd);
187                         }
188                 }
189                 else
190                 {
191                         struct sockaddr_un clientname;
192                         socklen_t size = sizeof(clientname);
193                 if((new_socket_fd = accept(socket_fd,
194                                 (struct sockaddr*)&clientname,
195                                                 &size)) < 0)
196                 {
197                         perror(_("RenderFarmClient::main_loop: accept"));
198                         return;
199                 }
200                         else
201                         {
202 printf("RenderFarmClient::main_loop: Session started from %s\n", clientname.sun_path);
203                                 RenderFarmClientThread *thread =
204                                         new RenderFarmClientThread(this);
205                                 thread->main_loop(new_socket_fd);
206                         }
207                 }
208         }
209 }
210
211 void RenderFarmClient::kill_client()
212 {
213 printf("RenderFarmClient::kill_client 1\n");
214         if(deamon_path)
215         {
216 printf("RenderFarmClient::kill_client 2\n");
217                 remove(deamon_path);
218                 kill(this_pid, SIGKILL);
219         }
220 }
221
222
223 // The thread requests jobs from the server until the job table is empty
224 // or the server reports an error.  This thread must poll the server
225 // after every frame for the error status.
226 // Detaches when finished.
227 RenderFarmClientThread::RenderFarmClientThread(RenderFarmClient *client)
228  : Thread(0, 0, 1)
229 {
230         this->client = client;
231         frames_per_second = 0;
232         Thread::set_synchronous(0);
233 //      fs_client = 0;
234         mutex_lock = new Mutex("RenderFarmClientThread::mutex_lock");
235         watchdog = 0;
236         keep_alive = 0;
237 }
238
239 RenderFarmClientThread::~RenderFarmClientThread()
240 {
241 //      if(fs_client) delete fs_client;
242         delete mutex_lock;
243         delete watchdog;
244         delete keep_alive;
245 }
246
247
248 int RenderFarmClientThread::send_request_header(int request,
249         int len)
250 {
251         unsigned char datagram[5];
252         datagram[0] = request;
253
254         int i = 1;
255         STORE_INT32(len);
256 // printf("RenderFarmClientThread::send_request_header %d %02x%02x%02x%02x%02x\n",
257 // request, datagram[0], datagram[1], datagram[2], datagram[3], datagram[4]);
258
259         return (write_socket((char*)datagram, 5) != 5);
260 }
261
262 int RenderFarmClientThread::write_socket(char *data, int len)
263 {
264 //printf("RenderFarmClientThread::write_socket 1\n");
265         int result = write(socket_fd, data, len);
266 //printf("RenderFarmClientThread::write_socket 10\n");
267         return result;
268 }
269
270 int RenderFarmClientThread::read_socket(char *data, int len)
271 {
272         int bytes_read = 0;
273         int offset = 0;
274 //printf("RenderFarmClientThread::read_socket 1\n");
275         watchdog->begin_request();
276         while(len > 0 && bytes_read >= 0)
277         {
278                 bytes_read = read(socket_fd, data + offset, len);
279                 if(bytes_read > 0)
280                 {
281                         len -= bytes_read;
282                         offset += bytes_read;
283                 }
284                 else
285                 if(bytes_read < 0)
286                 {
287                         break;
288                 }
289         }
290         watchdog->end_request();
291 //printf("RenderFarmClientThread::read_socket 10\n");
292
293         return offset;
294 }
295
296 int RenderFarmClientThread::write_int64(int64_t value)
297 {
298         unsigned char data[sizeof(int64_t)];
299         data[0] = (value >> 56) & 0xff;
300         data[1] = (value >> 48) & 0xff;
301         data[2] = (value >> 40) & 0xff;
302         data[3] = (value >> 32) & 0xff;
303         data[4] = (value >> 24) & 0xff;
304         data[5] = (value >> 16) & 0xff;
305         data[6] = (value >> 8) & 0xff;
306         data[7] = value & 0xff;
307         return (write_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
308 }
309
310 int64_t RenderFarmClientThread::read_int64(int *error)
311 {
312         int temp = 0;
313         if(!error) error = &temp;
314
315         unsigned char data[sizeof(int64_t)];
316         *error = (read_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
317
318 // Make it return 1 if error so it can be used to read a result code from the
319 // server.
320         int64_t result = 1;
321         if(!*error)
322         {
323                 result = (((int64_t)data[0]) << 56) |
324                         (((uint64_t)data[1]) << 48) |
325                         (((uint64_t)data[2]) << 40) |
326                         (((uint64_t)data[3]) << 32) |
327                         (((uint64_t)data[4]) << 24) |
328                         (((uint64_t)data[5]) << 16) |
329                         (((uint64_t)data[6]) << 8)  |
330                         data[7];
331         }
332         return result;
333 }
334
335 void RenderFarmClientThread::read_string(char* &string)
336 {
337         unsigned char header[4];
338         if(read_socket((char*)header, 4) != 4)
339         {
340                 string = 0;
341                 return;
342         }
343
344         int64_t len = (((u_int32_t)header[0]) << 24) |
345                                 (((u_int32_t)header[1]) << 16) |
346                                 (((u_int32_t)header[2]) << 8) |
347                                 ((u_int32_t)header[3]);
348
349         if(len)
350         {
351                 string = new char[len];
352                 if(read_socket(string, len) != len)
353                 {
354                         delete [] string;
355                         string = 0;
356                 }
357         }
358         else
359                 string = 0;
360
361 }
362
363 void RenderFarmClientThread::abort()
364 {
365         send_completion(socket_fd);
366         close(socket_fd);
367         exit(1);
368 }
369
370 void RenderFarmClientThread::lock(const char *location)
371 {
372         mutex_lock->lock(location);
373 }
374
375 void RenderFarmClientThread::unlock()
376 {
377         mutex_lock->unlock();
378 }
379
380 void RenderFarmClientThread::get_command(int socket_fd, int *command)
381 {
382         int error;
383         *command = read_int64(&error);
384         if(error)
385         {
386                 *command = 0;
387                 return;
388         }
389 }
390
391
392 void RenderFarmClientThread::read_preferences(int socket_fd,
393         Preferences *preferences)
394 {
395         lock("RenderFarmClientThread::read_preferences");
396         send_request_header(RENDERFARM_PREFERENCES,
397                 0);
398
399         char *string;
400         read_string(string);
401
402         BC_Hash defaults;
403         defaults.load_string((char*)string);
404         preferences->load_defaults(&defaults);
405
406         delete [] string;
407         unlock();
408 }
409
410
411
412 void RenderFarmClientThread::read_asset(int socket_fd, Asset *asset)
413 {
414         lock("RenderFarmClientThread::read_asset");
415         send_request_header(RENDERFARM_ASSET,
416                 0);
417
418         char *string1;
419         char *string2;
420         read_string(string1);
421         read_string(string2);
422
423
424
425         FileXML file;
426         file.read_from_string((char*)string2);
427         asset->read(&file);
428
429
430
431         BC_Hash defaults;
432         defaults.load_string((char*)string1);
433         asset->load_defaults(&defaults,
434                 0,
435                 1,
436                 1,
437                 1,
438                 1,
439                 1);
440
441 //printf("RenderFarmClientThread::read_asset %d\n", __LINE__);
442 //asset->dump();
443
444         delete [] string1;
445         delete [] string2;
446         unlock();
447 }
448
449 void RenderFarmClientThread::read_edl(int socket_fd,
450         EDL *edl,
451         Preferences *preferences)
452 {
453         lock("RenderFarmClientThread::read_edl");
454         send_request_header(RENDERFARM_EDL,
455                 0);
456
457         char *string;
458         read_string(string);
459
460
461         FileXML file;
462         file.read_from_string((char*)string);
463         delete [] string;
464
465
466
467
468
469
470
471
472         edl->load_xml(&file,
473                 LOAD_ALL);
474
475
476         unlock();
477 }
478
479 int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package)
480 {
481         lock("RenderFarmClientThread::read_package");
482         send_request_header(RENDERFARM_PACKAGE,
483                 4);
484
485         unsigned char datagram[5];
486         int i = 0;
487
488
489 // Fails if -ieee isn't set.
490         int64_t fixed = !EQUIV(frames_per_second, 0.0) ?
491                 (int64_t)(frames_per_second * 65536.0) : 0;
492         STORE_INT32(fixed);
493         write_socket((char*)datagram, 4);
494
495
496 //printf("RenderFarmClientThread::read_package 1 %f %ld\n", frames_per_second, fixed);
497         char *data;
498         unsigned char *data_ptr;
499         read_string(data);
500 //printf("RenderFarmClientThread::read_package 2 %p\n", data);
501 // Signifies end of session.
502         if(!data)
503         {
504 //              printf(_("RenderFarmClientThread::read_package no output path recieved.\n"));
505                 unlock();
506                 return 1;
507         }
508
509 //printf("RenderFarmClientThread::read_package 2\n");
510
511
512         data_ptr = (unsigned char*)data;
513         strcpy(package->path, data);
514         data_ptr += strlen(package->path);
515         data_ptr++;
516         package->audio_start = READ_INT32(data_ptr);
517         data_ptr += 4;
518         package->audio_end = READ_INT32(data_ptr);
519         data_ptr += 4;
520         package->video_start = READ_INT32(data_ptr);
521         data_ptr += 4;
522         package->video_end = READ_INT32(data_ptr);
523         data_ptr += 4;
524         package->use_brender = READ_INT32(data_ptr);
525         data_ptr += 4;
526         package->audio_do = READ_INT32(data_ptr);
527         data_ptr += 4;
528         package->video_do = READ_INT32(data_ptr);
529
530         delete [] data;
531         unlock();
532
533         return 0;
534 }
535
536 int RenderFarmClientThread::send_completion(int socket_fd)
537 {
538         lock("RenderFarmClientThread::send_completion");
539         int result = send_request_header(RENDERFARM_DONE, 0);
540         unlock();
541         return result;
542 }
543
544
545 void RenderFarmClientThread::ping_server()
546 {
547         lock("RenderFarmClientThread::ping_server");
548         send_request_header(RENDERFARM_KEEPALIVE, 0);
549         unlock();
550 }
551
552
553
554 void RenderFarmClientThread::main_loop(int socket_fd)
555 {
556         this->socket_fd = socket_fd;
557
558         Thread::start();
559 }
560
561 void RenderFarmClientThread::run()
562 {
563 // Create new memory space
564         pid = fork();
565         if(pid != 0)
566         {
567                 int return_value;
568                 waitpid(pid, &return_value, 0);
569                 return;
570         }
571
572 // Get the pid of the fork if inside the fork
573         pid = getpid();
574
575
576
577         int socket_fd = this->socket_fd;
578
579         init_client_keepalive();
580
581 // Get command to run
582         int command;
583         lock("RenderFarmClientThread::run");
584         get_command(socket_fd, &command);
585         unlock();
586
587 //printf("RenderFarmClientThread::run command=%d\n", command);
588
589         switch(command)
590         {
591                 case RENDERFARM_TUNER:
592                         do_tuner(socket_fd);
593                         break;
594                 case RENDERFARM_PACKAGES:
595                         do_packages(socket_fd);
596                         break;
597         }
598
599         _exit(0);
600 }
601
602
603 void RenderFarmClientThread::init_client_keepalive()
604 {
605         keep_alive = new RenderFarmKeepalive(this);
606         keep_alive->start();
607         watchdog = new RenderFarmWatchdog(0, this);
608         watchdog->start();
609 }
610
611
612
613 void RenderFarmClientThread::do_tuner(int socket_fd)
614 {
615 // Currently only 1 tuner driver.  Maybe more someday.
616         DVBTune server(this);
617         server.main_loop();
618         ::close(socket_fd);
619 }
620
621
622 void RenderFarmClientThread::do_packages(int socket_fd)
623 {
624
625         EDL *edl;
626         RenderPackage *package;
627         Asset *default_asset;
628         Preferences *preferences;
629
630
631
632         FarmPackageRenderer package_renderer(this, socket_fd);
633         int result = 0;
634
635
636
637 //printf("RenderFarmClientThread::run 2\n");
638 // Read settings
639         preferences = new Preferences;
640         default_asset = new Asset;
641         package = new RenderPackage;
642         edl = new EDL;
643         edl->create_objects();
644
645
646
647
648
649
650 //printf("RenderFarmClientThread::run 3\n");
651         read_preferences(socket_fd, preferences);
652 //printf("RenderFarmClientThread::run 4\n");
653         read_asset(socket_fd, default_asset);
654 //printf("RenderFarmClientThread::run 5\n");
655         read_edl(socket_fd, edl, preferences);
656 //edl->dump();
657 //printf("RenderFarmClientThread::run 6\n");
658
659
660
661
662
663
664
665
666         package_renderer.initialize(0,
667                         edl,
668                         preferences,
669                         default_asset);
670
671 // Read packages
672         while(1)
673         {
674 //printf("RenderFarmClientThread::run 5\n");
675                 result = read_package(socket_fd, package);
676 //printf("RenderFarmClientThread::run 6 %d\n", result);
677
678
679 // Finished list
680                 if(result)
681                 {
682 //printf("RenderFarmClientThread::run 7\n");
683
684                         result = send_completion(socket_fd);
685                         break;
686                 }
687
688 //              Timer timer;
689 //              timer.update();
690
691 // Error
692                 if(package_renderer.render_package(package))
693                 {
694 //printf("RenderFarmClientThread::run 8\n");
695                         result = send_completion(socket_fd);
696                         break;
697                 }
698
699
700                 frames_per_second = package_renderer.frames_per_second;
701 //              frames_per_second = (double)(package->video_end - package->video_start) /
702 //                      ((double)timer.get_difference() / 1000);
703
704 //printf("RenderFarmClientThread::run 9\n");
705
706
707
708         }
709
710
711 //printf("RenderFarmClientThread::run 9\n");
712         default_asset->Garbage::remove_user();
713 //printf("RenderFarmClientThread::run 10\n");
714         edl->Garbage::remove_user();
715 //printf("RenderFarmClientThread::run 11\n");
716         delete preferences;
717 printf(_("RenderFarmClientThread::run: Session finished.\n"));
718 }
719
720
721
722
723
724
725
726
727
728 RenderFarmKeepalive::RenderFarmKeepalive(
729         RenderFarmClientThread *client_thread)
730  : Thread(1, 0, 0)
731 {
732         this->client_thread = client_thread;
733         done = 0;
734 }
735
736 RenderFarmKeepalive::~RenderFarmKeepalive()
737 {
738         done = 1;
739         cancel();
740         join();
741 }
742
743 void RenderFarmKeepalive::run()
744 {
745         while(!done)
746         {
747                 enable_cancel();
748                 sleep(5);
749                 disable_cancel();
750                 if(!done)
751                 {
752 //printf("RenderFarmKeepalive::run 1\n");
753 // watchdog thread kills this if it gets stuck
754                         client_thread->ping_server();
755 //printf("RenderFarmKeepalive::run 10\n");
756                 }
757         }
758 }
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774 FarmPackageRenderer::FarmPackageRenderer(RenderFarmClientThread *thread,
775                 int socket_fd)
776  : PackageRenderer()
777 {
778         this->thread = thread;
779         this->socket_fd = socket_fd;
780 }
781
782
783
784 FarmPackageRenderer::~FarmPackageRenderer()
785 {
786 }
787
788
789 int FarmPackageRenderer::get_result()
790 {
791         thread->lock("FarmPackageRenderer::get_result");
792         thread->send_request_header(RENDERFARM_GET_RESULT,
793                 0);
794         unsigned char data[1];
795         data[0] = 1;
796         if(thread->read_socket((char*)data, 1) != 1)
797         {
798                 thread->unlock();
799                 return 1;
800         }
801         thread->unlock();
802         return data[0];
803 }
804
805 void FarmPackageRenderer::set_result(int value)
806 {
807         thread->lock("FarmPackageRenderer::set_result");
808         thread->send_request_header(RENDERFARM_SET_RESULT,
809                 1);
810         unsigned char data[1];
811         data[0] = value;
812         thread->write_socket((char*)data, 1);
813         thread->unlock();
814 }
815
816 void FarmPackageRenderer::set_progress(int64_t total_samples)
817 {
818         thread->lock("FarmPackageRenderer::set_progress");
819         thread->send_request_header(RENDERFARM_PROGRESS,
820                 8);
821         unsigned char datagram[8];
822         int i = 0;
823         STORE_INT32(total_samples);
824
825
826         int64_t fixed = (!EQUIV(frames_per_second, 0.0)) ?
827                 (int64_t)(frames_per_second * 65536.0) : 0;
828         STORE_INT32(fixed);
829
830
831         thread->write_socket((char*)datagram, 8);
832         thread->unlock();
833 }
834
835 int FarmPackageRenderer::set_video_map(int64_t position, int value)
836 {
837         int result = 0;
838         unsigned char datagram[8];
839         char return_value[1];
840         int i = 0;
841
842         thread->lock("FarmPackageRenderer::set_video_map");
843         thread->send_request_header(RENDERFARM_SET_VMAP,
844                 8);
845         STORE_INT32(position);
846         STORE_INT32(value);
847         thread->write_socket((char*)datagram, 8);
848
849 // Get completion since the GUI may be locked for a long time.
850         if(!thread->read_socket(return_value, 1))
851         {
852                 result = 1;
853         }
854
855         thread->unlock();
856         return result;
857 }
858
859
860
861
862
863
864
865
866
867