remove filefork
[goodguy/history.git] / cinelerra-5.0 / cinelerra / renderfarmclient.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  * 
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  * 
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  * 
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  * 
20  */
21
22 #include "asset.h"
23 #include "assets.h"
24 #include "clip.h"
25 #include "bchash.h"
26 #include "dvbtune.h"
27 #include "edl.h"
28 #include "filesystem.h"
29 #include "filexml.h"
30 #include "language.h"
31 #include "mutex.h"
32 #include "mwindow.h"
33 #include "pluginserver.h"
34 #include "preferences.h"
35 #include "renderfarm.h"
36 #include "renderfarmclient.h"
37 //#include "renderfarmfsclient.h"
38 #include "sighandler.h"
39
40 #include <arpa/inet.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <netdb.h>
44 #include <netinet/in.h>
45 #include <stdio.h>
46 #include <string.h>
47 #include <sys/socket.h>
48 #include <sys/types.h>
49 #include <sys/un.h>
50 #include <sys/wait.h>
51 #include <unistd.h>
52
53
54
55
56 // The render client waits for connections from the server.
57 // Then it starts a thread for each connection.
58 RenderFarmClient::RenderFarmClient(int port, 
59         char *deamon_path, 
60         int nice_value,
61         char *config_path)
62 {
63         this->port = port;
64         this->deamon_path = deamon_path;
65         SigHandler *signals = new SigHandler;
66         signals->initialize();
67
68         this_pid = getpid();
69         (void)nice(nice_value);
70
71
72         MWindow::init_defaults(boot_defaults, config_path);
73         boot_preferences = new Preferences;
74         boot_preferences->load_defaults(boot_defaults);
75         MWindow::init_plugins(0, boot_preferences);
76 }
77
78
79
80
81 RenderFarmClient::~RenderFarmClient()
82 {
83 //      delete thread;
84         delete boot_defaults;
85         delete boot_preferences;
86         plugindb->remove_all_objects();
87         delete plugindb;
88 }
89
90
91 void RenderFarmClient::main_loop()
92 {
93         int socket_fd;
94         BC_WindowBase::get_resources()->vframe_shm = 1;
95
96
97 // Open listening port
98
99         if(!deamon_path)
100         {
101                 struct sockaddr_in addr;
102
103                 addr.sin_family = AF_INET;
104                 addr.sin_port = htons((unsigned short)port);
105                 addr.sin_addr.s_addr = htonl(INADDR_ANY);
106
107                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
108                 {
109                         perror(_("RenderFarmClient::main_loop: socket"));
110                         return;
111                 }
112
113                 if(bind(socket_fd, 
114                         (struct sockaddr*)&addr, 
115                         sizeof(addr)) < 0)
116                 {
117                         fprintf(stderr, 
118                                 _("RenderFarmClient::main_loop: bind port %d: %s"),
119                                 port,
120                                 strerror(errno));
121                         return;
122                 }
123         }
124         else
125         {
126                 struct sockaddr_un addr;
127                 addr.sun_family = AF_FILE;
128                 strcpy(addr.sun_path, deamon_path);
129                 int size = (offsetof(struct sockaddr_un, sun_path) + 
130                         strlen(deamon_path) + 1);
131
132                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
133                 {
134                         perror(_("RenderFarmClient::main_loop: socket"));
135                         return;
136                 }
137
138                 if(bind(socket_fd, 
139                         (struct sockaddr*)&addr, 
140                         size) < 0)
141                 {
142                         fprintf(stderr, 
143                                 _("RenderFarmClient::main_loop: bind path %s: %s\n"),
144                                 deamon_path,
145                                 strerror(errno));
146                         return;
147                 }
148         }
149
150 // Wait for connections
151         printf("RenderFarmClient::main_loop: client started\n");
152         while(1)
153         {
154                 if(listen(socket_fd, 256) < 0)
155         {
156                 perror(_("RenderFarmClient::main_loop: listen"));
157                 return;
158         }
159
160                 int new_socket_fd;
161
162
163
164                 if(!deamon_path)
165                 {
166                         struct sockaddr_in clientname;
167                         socklen_t size = sizeof(clientname);
168                 if((new_socket_fd = accept(socket_fd,
169                                 (struct sockaddr*)&clientname, 
170                                                 &size)) < 0)
171                 {
172                         perror(_("RenderFarmClient::main_loop: accept"));
173                         return;
174                 }
175                         else
176                         {
177 printf("RenderFarmClient::main_loop: Session started from %s\n", inet_ntoa(clientname.sin_addr));
178                                 RenderFarmClientThread *thread = 
179                                         new RenderFarmClientThread(this);
180                                 thread->main_loop(new_socket_fd);
181                         }
182                 }
183                 else
184                 {
185                         struct sockaddr_un clientname;
186                         socklen_t size = sizeof(clientname);
187                 if((new_socket_fd = accept(socket_fd,
188                                 (struct sockaddr*)&clientname, 
189                                                 &size)) < 0)
190                 {
191                         perror(_("RenderFarmClient::main_loop: accept"));
192                         return;
193                 }
194                         else
195                         {
196 printf("RenderFarmClient::main_loop: Session started from %s\n", clientname.sun_path);
197                                 RenderFarmClientThread *thread = 
198                                         new RenderFarmClientThread(this);
199                                 thread->main_loop(new_socket_fd);
200                         }
201                 }
202         }
203 }
204
205 void RenderFarmClient::kill_client()
206 {
207 printf("RenderFarmClient::kill_client 1\n");
208         if(deamon_path)
209         {
210 printf("RenderFarmClient::kill_client 2\n");
211                 remove(deamon_path);
212                 kill(this_pid, SIGKILL);
213         }
214 }
215
216
217
218
219
220
221
222
223
224
225
226 // The thread requests jobs from the server until the job table is empty
227 // or the server reports an error.  This thread must poll the server
228 // after every frame for the error status.
229 // Detaches when finished.
230 RenderFarmClientThread::RenderFarmClientThread(RenderFarmClient *client)
231  : Thread(0, 0, 1)
232 {
233         this->client = client;
234         frames_per_second = 0;
235         Thread::set_synchronous(0);
236 //      fs_client = 0;
237         mutex_lock = new Mutex("RenderFarmClientThread::mutex_lock");
238         watchdog = 0;
239         keep_alive = 0;
240 }
241
242 RenderFarmClientThread::~RenderFarmClientThread()
243 {
244 //      if(fs_client) delete fs_client;
245         delete mutex_lock;
246         delete watchdog;
247         delete keep_alive;
248 }
249
250
251 int RenderFarmClientThread::send_request_header(int request, 
252         int len)
253 {
254         unsigned char datagram[5];
255         datagram[0] = request;
256
257         int i = 1;
258         STORE_INT32(len);
259 // printf("RenderFarmClientThread::send_request_header %d %02x%02x%02x%02x%02x\n",
260 // request, datagram[0], datagram[1], datagram[2], datagram[3], datagram[4]);
261
262         return (write_socket((char*)datagram, 5) != 5);
263 }
264
265 int RenderFarmClientThread::write_socket(char *data, int len)
266 {
267 //printf("RenderFarmClientThread::write_socket 1\n");
268         int result = write(socket_fd, data, len);
269 //printf("RenderFarmClientThread::write_socket 10\n");
270         return result;
271 }
272
273 int RenderFarmClientThread::read_socket(char *data, int len)
274 {
275         int bytes_read = 0;
276         int offset = 0;
277 //printf("RenderFarmClientThread::read_socket 1\n");
278         watchdog->begin_request();
279         while(len > 0 && bytes_read >= 0)
280         {
281                 bytes_read = read(socket_fd, data + offset, len);
282                 if(bytes_read > 0)
283                 {
284                         len -= bytes_read;
285                         offset += bytes_read;
286                 }
287                 else
288                 if(bytes_read < 0)
289                 {
290                         break;
291                 }
292         }
293         watchdog->end_request();
294 //printf("RenderFarmClientThread::read_socket 10\n");
295
296         return offset;
297 }
298
299 int RenderFarmClientThread::write_int64(int64_t value)
300 {
301         unsigned char data[sizeof(int64_t)];
302         data[0] = (value >> 56) & 0xff;
303         data[1] = (value >> 48) & 0xff;
304         data[2] = (value >> 40) & 0xff;
305         data[3] = (value >> 32) & 0xff;
306         data[4] = (value >> 24) & 0xff;
307         data[5] = (value >> 16) & 0xff;
308         data[6] = (value >> 8) & 0xff;
309         data[7] = value & 0xff;
310         return (write_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
311 }
312
313 int64_t RenderFarmClientThread::read_int64(int *error)
314 {
315         int temp = 0;
316         if(!error) error = &temp;
317
318         unsigned char data[sizeof(int64_t)];
319         *error = (read_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
320
321 // Make it return 1 if error so it can be used to read a result code from the
322 // server.
323         int64_t result = 1;
324         if(!*error)
325         {
326                 result = (((int64_t)data[0]) << 56) |
327                         (((uint64_t)data[1]) << 48) | 
328                         (((uint64_t)data[2]) << 40) |
329                         (((uint64_t)data[3]) << 32) |
330                         (((uint64_t)data[4]) << 24) |
331                         (((uint64_t)data[5]) << 16) |
332                         (((uint64_t)data[6]) << 8)  |
333                         data[7];
334         }
335         return result;
336 }
337
338 void RenderFarmClientThread::read_string(char* &string)
339 {
340         unsigned char header[4];
341         if(read_socket((char*)header, 4) != 4)
342         {
343                 string = 0;
344                 return;
345         }
346
347         int64_t len = (((u_int32_t)header[0]) << 24) | 
348                                 (((u_int32_t)header[1]) << 16) | 
349                                 (((u_int32_t)header[2]) << 8) | 
350                                 ((u_int32_t)header[3]);
351
352         if(len)
353         {
354                 string = new char[len];
355                 if(read_socket(string, len) != len)
356                 {
357                         delete [] string;
358                         string = 0;
359                 }
360         }
361         else
362                 string = 0;
363
364 }
365
366 void RenderFarmClientThread::abort()
367 {
368         send_completion(socket_fd);
369         close(socket_fd);
370         exit(1);
371 }
372
373 void RenderFarmClientThread::lock(const char *location)
374 {
375         mutex_lock->lock(location);
376 }
377
378 void RenderFarmClientThread::unlock()
379 {
380         mutex_lock->unlock();
381 }
382
383 void RenderFarmClientThread::get_command(int socket_fd, int *command)
384 {
385         int error;
386         *command = read_int64(&error);
387         if(error)
388         {
389                 *command = 0;
390                 return;
391         }
392 }
393
394
395 void RenderFarmClientThread::read_preferences(int socket_fd, 
396         Preferences *preferences)
397 {
398         lock("RenderFarmClientThread::read_preferences");
399         send_request_header(RENDERFARM_PREFERENCES, 
400                 0);
401
402         char *string;
403         read_string(string);
404
405         BC_Hash defaults;
406         defaults.load_string((char*)string);
407         preferences->load_defaults(&defaults);
408
409         delete [] string;
410         unlock();
411 }
412
413
414
415 void RenderFarmClientThread::read_asset(int socket_fd, Asset *asset)
416 {
417         lock("RenderFarmClientThread::read_asset");
418         send_request_header(RENDERFARM_ASSET, 
419                 0);
420
421         char *string1;
422         char *string2;
423         read_string(string1);
424         read_string(string2);
425
426
427
428         FileXML file;
429         file.read_from_string((char*)string2);
430         asset->read(&file);
431         
432
433
434         BC_Hash defaults;
435         defaults.load_string((char*)string1);
436         asset->load_defaults(&defaults,
437                 0,
438                 1,
439                 1,
440                 1,
441                 1,
442                 1);
443
444 //printf("RenderFarmClientThread::read_asset %d\n", __LINE__);
445 //asset->dump();
446
447         delete [] string1;
448         delete [] string2;
449         unlock();
450 }
451
452 void RenderFarmClientThread::read_edl(int socket_fd, 
453         EDL *edl, 
454         Preferences *preferences)
455 {
456         lock("RenderFarmClientThread::read_edl");
457         send_request_header(RENDERFARM_EDL, 
458                 0);
459
460         char *string;
461         read_string(string);
462
463
464         FileXML file;
465         file.read_from_string((char*)string);
466         delete [] string;
467
468
469
470
471
472
473
474
475         edl->load_xml(&file, 
476                 LOAD_ALL);
477
478
479         unlock();
480 }
481
482 int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package)
483 {
484         lock("RenderFarmClientThread::read_package");
485         send_request_header(RENDERFARM_PACKAGE, 
486                 4);
487
488         unsigned char datagram[5];
489         int i = 0;
490
491
492 // Fails if -ieee isn't set.
493         int64_t fixed = !EQUIV(frames_per_second, 0.0) ? 
494                 (int64_t)(frames_per_second * 65536.0) : 0;
495         STORE_INT32(fixed);
496         write_socket((char*)datagram, 4);
497
498
499 //printf("RenderFarmClientThread::read_package 1 %f %ld\n", frames_per_second, fixed);
500         char *data;
501         unsigned char *data_ptr;
502         read_string(data);
503 //printf("RenderFarmClientThread::read_package 2 %p\n", data);
504 // Signifies end of session.
505         if(!data) 
506         {
507 //              printf(_("RenderFarmClientThread::read_package no output path recieved.\n"));
508                 unlock();
509                 return 1;
510         }
511
512 //printf("RenderFarmClientThread::read_package 2\n");
513
514
515         data_ptr = (unsigned char*)data;
516         strcpy(package->path, data);
517         data_ptr += strlen(package->path);
518         data_ptr++;
519         package->audio_start = READ_INT32(data_ptr);
520         data_ptr += 4;
521         package->audio_end = READ_INT32(data_ptr);
522         data_ptr += 4;
523         package->video_start = READ_INT32(data_ptr);
524         data_ptr += 4;
525         package->video_end = READ_INT32(data_ptr);
526         data_ptr += 4;
527         package->use_brender = READ_INT32(data_ptr);
528
529         delete [] data;
530         unlock();
531
532         return 0;
533 }
534
535 int RenderFarmClientThread::send_completion(int socket_fd)
536 {
537         lock("RenderFarmClientThread::send_completion");
538         int result = send_request_header(RENDERFARM_DONE, 0);
539         unlock();
540         return result;
541 }
542
543
544 void RenderFarmClientThread::ping_server()
545 {
546         lock("RenderFarmClientThread::ping_server");
547         send_request_header(RENDERFARM_KEEPALIVE, 0);
548         unlock();
549 }
550
551
552
553 void RenderFarmClientThread::main_loop(int socket_fd)
554 {
555         this->socket_fd = socket_fd;
556
557         Thread::start();
558 }
559
560 void RenderFarmClientThread::run()
561 {
562 // Create new memory space
563         pid = fork();
564         if(pid != 0)
565         {
566                 int return_value;
567                 waitpid(pid, &return_value, 0);
568                 return;
569         }
570
571 // Get the pid of the fork if inside the fork
572         pid = getpid();
573
574
575
576         int socket_fd = this->socket_fd;
577
578         init_client_keepalive();
579
580 // Get command to run
581         int command;
582         lock("RenderFarmClientThread::run");
583         get_command(socket_fd, &command);
584         unlock();
585
586 //printf("RenderFarmClientThread::run command=%d\n", command);
587
588         switch(command)
589         {
590                 case RENDERFARM_TUNER:
591                         do_tuner(socket_fd);
592                         break;
593                 case RENDERFARM_PACKAGES:
594                         do_packages(socket_fd);
595                         break;
596         }
597
598         _exit(0);
599 }
600
601
602 void RenderFarmClientThread::init_client_keepalive()
603 {
604         keep_alive = new RenderFarmKeepalive(this);
605         keep_alive->start();
606         watchdog = new RenderFarmWatchdog(0, this);
607         watchdog->start();
608 }
609
610
611
612 void RenderFarmClientThread::do_tuner(int socket_fd)
613 {
614 // Currently only 1 tuner driver.  Maybe more someday.
615         DVBTune server(this);
616         server.main_loop();
617         ::close(socket_fd);
618 }
619
620
621 void RenderFarmClientThread::do_packages(int socket_fd)
622 {
623
624         EDL *edl;
625         RenderPackage *package;
626         Asset *default_asset;
627         Preferences *preferences;
628
629
630
631         FarmPackageRenderer package_renderer(this, socket_fd);
632         int result = 0;
633
634
635
636 //printf("RenderFarmClientThread::run 2\n");
637 // Read settings
638         preferences = new Preferences;
639         default_asset = new Asset;
640         package = new RenderPackage;
641         edl = new EDL;
642         edl->create_objects();
643
644
645
646
647
648
649 //printf("RenderFarmClientThread::run 3\n");
650         read_preferences(socket_fd, preferences);
651 //printf("RenderFarmClientThread::run 4\n");
652         read_asset(socket_fd, default_asset);
653 //printf("RenderFarmClientThread::run 5\n");
654         read_edl(socket_fd, edl, preferences);
655 //edl->dump();
656 //printf("RenderFarmClientThread::run 6\n");
657
658
659
660
661
662
663
664
665         package_renderer.initialize(0,
666                         edl, 
667                         preferences, 
668                         default_asset);
669
670 // Read packages
671         while(1)
672         {
673 //printf("RenderFarmClientThread::run 5\n");
674                 result = read_package(socket_fd, package);
675 //printf("RenderFarmClientThread::run 6 %d\n", result);
676
677
678 // Finished list
679                 if(result)
680                 {
681 //printf("RenderFarmClientThread::run 7\n");
682
683                         result = send_completion(socket_fd);
684                         break;
685                 }
686
687 //              Timer timer;
688 //              timer.update();
689
690 // Error
691                 if(package_renderer.render_package(package))
692                 {
693 //printf("RenderFarmClientThread::run 8\n");
694                         result = send_completion(socket_fd);
695                         break;
696                 }
697
698
699                 frames_per_second = package_renderer.frames_per_second;
700 //              frames_per_second = (double)(package->video_end - package->video_start) / 
701 //                      ((double)timer.get_difference() / 1000);
702
703 //printf("RenderFarmClientThread::run 9\n");
704
705
706
707         }
708
709
710 //printf("RenderFarmClientThread::run 9\n");
711         default_asset->Garbage::remove_user();
712 //printf("RenderFarmClientThread::run 10\n");
713         edl->Garbage::remove_user();
714 //printf("RenderFarmClientThread::run 11\n");
715         delete preferences;
716 printf(_("RenderFarmClientThread::run: Session finished.\n"));
717 }
718
719
720
721
722
723
724
725
726
727 RenderFarmKeepalive::RenderFarmKeepalive(
728         RenderFarmClientThread *client_thread)
729  : Thread(1, 0, 0)
730 {
731         this->client_thread = client_thread;
732         done = 0;
733 }
734
735 RenderFarmKeepalive::~RenderFarmKeepalive()
736 {
737         done = 1;
738         cancel();
739         join();
740 }
741
742 void RenderFarmKeepalive::run()
743 {
744         while(!done)
745         {
746                 enable_cancel();
747                 sleep(5);
748                 disable_cancel();
749                 if(!done)
750                 {
751 //printf("RenderFarmKeepalive::run 1\n");
752 // watchdog thread kills this if it gets stuck
753                         client_thread->ping_server();
754 //printf("RenderFarmKeepalive::run 10\n");
755                 }
756         }
757 }
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773 FarmPackageRenderer::FarmPackageRenderer(RenderFarmClientThread *thread,
774                 int socket_fd)
775  : PackageRenderer()
776 {
777         this->thread = thread;
778         this->socket_fd = socket_fd;
779 }
780
781
782
783 FarmPackageRenderer::~FarmPackageRenderer()
784 {
785 }
786
787
788 int FarmPackageRenderer::get_result()
789 {
790         thread->lock("FarmPackageRenderer::get_result");
791         thread->send_request_header(RENDERFARM_GET_RESULT, 
792                 0);
793         unsigned char data[1];
794         data[0] = 1;
795         if(thread->read_socket((char*)data, 1) != 1)
796         {
797                 thread->unlock();
798                 return 1;
799         }
800         thread->unlock();
801         return data[0];
802 }
803
804 void FarmPackageRenderer::set_result(int value)
805 {
806         thread->lock("FarmPackageRenderer::set_result");
807         thread->send_request_header(RENDERFARM_SET_RESULT, 
808                 1);
809         unsigned char data[1];
810         data[0] = value;
811         thread->write_socket((char*)data, 1);
812         thread->unlock();
813 }
814
815 void FarmPackageRenderer::set_progress(int64_t total_samples)
816 {
817         thread->lock("FarmPackageRenderer::set_progress");
818         thread->send_request_header(RENDERFARM_PROGRESS, 
819                 8);
820         unsigned char datagram[8];
821         int i = 0;
822         STORE_INT32(total_samples);
823         
824
825         int64_t fixed = (!EQUIV(frames_per_second, 0.0)) ? 
826                 (int64_t)(frames_per_second * 65536.0) : 0;
827         STORE_INT32(fixed);
828         
829         
830         thread->write_socket((char*)datagram, 8);
831         thread->unlock();
832 }
833
834 int FarmPackageRenderer::set_video_map(int64_t position, int value)
835 {
836         int result = 0;
837         unsigned char datagram[8];
838         char return_value[1];
839         int i = 0;
840
841         thread->lock("FarmPackageRenderer::set_video_map");
842         thread->send_request_header(RENDERFARM_SET_VMAP, 
843                 8);
844         STORE_INT32(position);
845         STORE_INT32(value);
846         thread->write_socket((char*)datagram, 8);
847
848 // Get completion since the GUI may be locked for a long time.
849         if(!thread->read_socket(return_value, 1))
850         {
851                 result = 1;
852         }
853
854         thread->unlock();
855         return result;
856 }
857
858
859
860
861
862
863
864
865
866