2 #if defined(__TERMUX__)
7 typedef zmpeg3_t::zloc_t zzloc_t;
12 return syscall(SYS_sched_yield);
18 return syscall(SYS_gettid);
25 while( (ret=zfutex(FUTEX_WAKE, nwakeups)) < 0 );
32 return zfutex(FUTEX_WAIT, val);
38 fprintf(stderr,"unlocking and not locked\n");
45 if( v || zxchg(1,loc) >= 0 ) do {
47 } while ( zxchg(1,loc) >= 0 );
61 zdecr(loc); lk.lock();
62 zincr(loc); lk.unlock();
76 if( r < 0 ) zdecr(loc);
77 int v; while( (v=loc) >= 0 ) zwait(v);
83 if( r < 0 ) zincr(loc);
93 while( blocking ) { unlock(); lk.lock(); lk.unlock(); lock(); }
102 if( !--users && blocking ) wake();
110 blocking = pthread_self();
113 while( users ) { unlock(); wait(); lock(); }
121 blocking = 0; lk.unlock();
127 buffer_t(zmpeg3_t *zsrc, int access)
130 access_type = access;
131 alloc = !(access_type & io_SEQUENTIAL) ? IO_SIZE : SEQ_IO_SIZE;
137 owner = pthread_self();
143 if( (access_type & io_THREADED) )
145 if( data ) delete [] data;
166 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,0);
167 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
169 while( !reader_done ) {
175 if( size+MAX_IO_SIZE > alloc )
176 size = alloc - MAX_IO_SIZE;
178 int64_t count = read_in(zmpeg3_t::MAX_IO_SIZE);
183 if( (size+=count) > alloc )
185 if( src->recd_fd >= 0 && the_writer )
186 write_lock.unblock();
195 reader(void *the_buffer)
197 buffer_t *b = (buffer_t *)the_buffer;
205 const int mx_blksz = 0x100000;
207 while( !writer_done ) {
208 write_record(mx_blksz,0xfff);
209 if( file_pos - write_pos < mx_blksz )
215 write_record(INT_MAX, 0);
220 writer(void *the_buffer)
222 buffer_t *b = (buffer_t *)the_buffer;
228 open_file(char *path)
230 //zmsgs("1 %s\n", path);
232 if( (access_type & io_THREADED) )
233 access_type |= io_SINGLE_ACCESS + io_SEQUENTIAL;
234 if( !(access_type & io_UNBUFFERED) ) {
236 if( !(fp=::fopen(path, "rb")) ) {
244 int mode = (access_type & io_NONBLOCK) ? O_RDONLY+O_NONBLOCK : O_RDONLY;
245 if( (fd=::open(path, mode)) < 0 ) {
251 if( (access_type & io_THREADED) )
263 in = fin = out = size = 0;
264 file_pos = out_pos = file_nudge = 0;
272 if( !(access_type & io_SEQUENTIAL) ) {
273 if( !(access_type & io_UNBUFFERED) ) {
274 if( ::fseek(fp,0,SEEK_SET) < 0 )
275 perrs("fseek %jd", file_pos);
278 if( ::lseek(fd,0,SEEK_SET) < 0 )
279 perrs("lseek %jd", file_pos);
290 pthread_create(&the_reader,0,reader,this);
296 if( reader_done || !the_reader ) return;
300 while( reader_done >= 0 && --tmo >= 0 ) usleep(100000);
301 if( tmo < 0 ) pthread_cancel(the_reader);
310 pthread_create(&the_writer,0,writer,this);
317 if( writer_done || !the_writer ) return 1;
319 write_lock.unblock();
320 pthread_join(the_writer,0);
322 while( writer_done >= 0 && --tmo >= 0 ) usleep(100000);
323 if( tmo < 0 ) pthread_cancel(the_writer);
331 int isz = 2*src->packet_size;
332 if( sz > isz ) isz = sz;
335 if( src->is_program_stream() ) {
336 static uint8_t pack_start[4] = { 0x00, 0x00, 0x01, 0xba };
337 pat = pack_start; psz = sizeof(pack_start);
339 else if( src->is_transport_stream() ) {
340 static uint8_t sync_start[1] = { 0x47 };
341 pat = sync_start; psz = sizeof(sync_start);
343 if( isz > out_pos ) isz = out_pos;
344 int64_t bsz = file_pos - out_pos;
345 int i = bsz>alloc ? 0 : alloc-bsz;
346 if( isz > i ) isz = i;
348 if( i < 0 ) i += alloc;
349 uint8_t *bfr = &data[i];
350 uint8_t *lmt = &data[alloc];
351 int64_t len = isz + bsz;
354 while( --len >= 0 ) {
355 if( *bfr++ != pat[i] ) { i = 0; }
356 else if( ++i >= psz ) break;
357 if( bfr >= lmt ) bfr = &data[0];
360 if( len < 0 ) return 1;
362 if( (bfr-=psz) < &data[0] ) bfr += alloc;
364 write_pos = file_pos - len;
368 // write blocks of (mask+1) bytes of data at data+wout, update wout
369 // only write full blocks, fragments cause disk thrashing
371 write_record(int sz, int mask)
373 int isz = file_pos - write_pos;
374 if( isz > sz ) isz = sz;
375 if( !(isz &= ~mask) ) return;
376 //zmsgs(" isz=%d, file_pos=%ld, write_pos=%ld\n", isz, file_pos, write_pos);
379 int n = alloc - wout;
380 if( n > len ) n = len;
382 src->write_record(&data[wout],n);
386 src->write_record(&data[0],wout=len);
394 if( open_count > 0 && --open_count == 0 ) {
395 if( (access_type & io_THREADED) )
397 if( !src || !src->iopened ) {
398 if( !(access_type & io_UNBUFFERED) ) {
399 if( fp != 0 ) { fclose(fp); fp = 0; }
402 if( fd >= 0 ) { close(fd); fd = -1; }
408 /* sole user of in ptr (except restart) */
413 while( count < len ) {
414 int xfr = len - count;
415 int avl = alloc - in;
416 if( avl < xfr ) xfr = avl;
417 if( access_type & io_UNBUFFERED ) {
418 if( access_type & io_NONBLOCK ) {
420 for( int i=10; avl<0 && --i>=0; ) { // 10 retries, 2 seconds
421 struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 200000;
422 fd_set rd_fd; FD_ZERO(&rd_fd); FD_SET(fd, &rd_fd);
423 int ret = select(fd+1, &rd_fd, 0, 0, &tv);
426 if( !FD_ISSET(fd, &rd_fd) ) continue;
427 if( (ret=::read(fd, &data[in], xfr)) > 0 ) avl = ret;
431 avl = ::read(fd, &data[in], xfr);
434 avl = ::fread(&data[in], 1, xfr, fp);
435 if( !avl && ferror(fp) ) avl = -1;
438 if( errno == EOVERFLOW ) {
443 if( !(access_type & io_ERRFAIL) ) {
444 if( errs < IO_ERR_LIMIT && xfr > ERR_PACKET_SIZE )
445 xfr = ERR_PACKET_SIZE;
446 memset(&data[in],0,xfr);
447 if( !(access_type & io_THREADED) ) {
448 int64_t pos = file_pos + count + xfr;
449 if( (access_type & io_UNBUFFERED) ) {
450 if( ::lseek(fd,pos,SEEK_SET) < 0 )
451 perrs("lseek pos %jx",pos);
454 if( ::fseek(fp,pos,SEEK_SET) < 0 )
455 perrs("fseek pos %jx",pos);
461 perrs("read pos %jx",file_pos + count);
467 if( avl == 0 ) break;
468 if( paused ) continue;
471 if( (avl=in-alloc) >= 0 ) in = avl;
480 if( (access_type & io_THREADED) && !restarted ) {
481 while( !reader_done && !restarted && file_pos <= pos ) {
487 if( reader_done || restarted ) result = 1;
496 if( file_pos != pos ) {
497 if( !(access_type & io_SEQUENTIAL) ) {
498 result = (access_type & io_UNBUFFERED) ?
499 (lseek64(fd, pos, SEEK_SET) >= 0 ? 0 : 1) :
500 (fseeko(fp, pos, SEEK_SET) == 0 ? 0 : 1) ;
502 else if( pos != 0 ) {
503 if( (access_type & io_THREADED) )
505 zerrs("seek on sequential from %jd to %jd\n", file_pos, pos);
517 read_fin(int64_t len)
519 int64_t count = read_in(len);
520 /* must already be locked */
523 if( (size+=count) > alloc )
525 return len && count ? 0 : 1;
529 seek_to(int64_t pos, int64_t len)
531 //zmsgs(" pos=%ld, len=%ld\n", pos, len);
532 int result = seek_in(pos);
534 file_pos = out_pos = pos;
536 result = read_fin(len);
544 //zmsgs(" pos=%ld\n", pos);
546 int64_t len = pos - file_pos;
548 zerrs("reversed seq read (%jd < %jd)\n", pos, file_pos);
552 result = read_fin(len);
561 int64_t start_pos = file_pos - size;
562 if( pos < start_pos ) { /* before buffer */
563 if( (access_type & io_THREADED) ) {
565 zerrs("threaded sync before buffer %jd < %jd\n", pos, start_pos);
566 int64_t mid_pos = start_pos + size/2;
567 file_nudge += mid_pos - pos;
571 restart(0); /* allow fake seek to start */
579 result = pos < file_pos ? 0 : /* in buffer */
580 (access_type & io_THREADED) ?
581 wait_in(pos) : -1; /* after buffer */
582 if( result < 0 ) { /* out of buffer */
583 //zmsgs("out of buffer pos=%ld, start_pos=%ld, file_pos=%ld, out_pos=%ld\n",
584 // pos, start_pos, file_pos, out_pos);
585 int64_t seek_pos = pos - alloc/2;
586 int64_t end_pos = seek_pos + alloc;
587 if( seek_pos < 0 ) seek_pos = 0;
588 result = (access_type & io_SEQUENTIAL) ||
589 (seek_pos < file_pos && end_pos >= file_pos) ?
590 read_to(end_pos) : seek_to(seek_pos, alloc);
593 int64_t offset = file_pos - pos;
594 if( offset >= 0 && offset <= size ) {
595 if( (offset=fin-offset) < 0 ) offset += alloc;
596 out = offset; out_pos = pos;
605 read_out(uint8_t *bfr,int len)
608 while( count < len ) {
609 int avail = file_pos - out_pos;
610 if( avail <= 0 ) break;
611 int fragment_size = alloc - out;
612 if( fragment_size > avail ) fragment_size = avail;
614 if( fragment_size > avail ) fragment_size = avail;
615 memcpy(bfr, &data[out], fragment_size);
616 bfr += fragment_size;
617 out += fragment_size;
618 if( out >= alloc ) out = 0;
619 out_pos += fragment_size;
620 count += fragment_size;
625 static int fd_name(int fd,char *nm,int sz)
628 snprintf(&pfn[0],sizeof(pfn),"/proc/self/fd/%d",fd);
629 return readlink(&pfn[0],nm,sz);
633 fs_t(zmpeg3_t *zsrc, const char *fpath, int access)
637 buffer = new buffer_t(src, access);
638 if( (access & io_SINGLE_ACCESS) )
643 fs_t(zmpeg3_t *zsrc, int fd, int access)
646 access |= io_UNBUFFERED;
647 access |= io_SINGLE_ACCESS;
648 if( (access & io_THREADED) ) access |= io_SEQUENTIAL;
649 buffer = new buffer_t(src, access);
650 if( !fd_name(fd,&path[0],sizeof(path)) )
651 css.get_keys(&path[0]);
654 buffer->open_count = 1;
656 if( (buffer->access_type & io_THREADED) )
657 buffer->start_reader();
661 fs_t(zmpeg3_t *zsrc, FILE *fp, int access)
664 access &= ~io_UNBUFFERED;
665 access |= io_SINGLE_ACCESS;
666 if( (access & io_THREADED) ) access |= io_SEQUENTIAL;
667 buffer = new buffer_t(src, access);
669 if( !fd_name(fd,&path[0],sizeof(path)) )
670 css.get_keys(&path[0]);
673 buffer->open_count = 1;
675 if( (buffer->access_type & io_THREADED) )
676 buffer->start_reader();
690 strcpy(path, fs.path);
691 total_bytes = fs.total_bytes;
693 if( !fs.buffer ) return;
694 int access = fs.buffer->access_type;
695 if( (access & io_SINGLE_ACCESS) ) {
698 is_open = fs.is_open;
700 ++buffer->open_count;
703 buffer = new buffer_t(src, access);
710 uint32_t ret = buffer->next_byte();
719 uint32_t result = buffer->get_byte();
730 if( current_byte+2 < buffer->file_tell() ) {
731 b = buffer->get_byte();
732 a = buffer->get_byte();
736 b = buffer->get_byte(); chk_next();
737 a = buffer->get_byte(); ++current_byte;
740 uint32_t result = (b << 8) | (a);
749 if( current_byte+3 < buffer->file_tell() ) {
750 c = buffer->get_byte();
751 b = buffer->get_byte();
752 a = buffer->get_byte();
756 c = buffer->get_byte(); chk_next();
757 b = buffer->get_byte(); chk_next();
758 a = buffer->get_byte(); ++current_byte;
761 uint32_t result = (c << 16) | (b << 8) | (a);
770 if( current_byte+4 < buffer->file_tell() ) {
771 d = buffer->get_byte();
772 c = buffer->get_byte();
773 b = buffer->get_byte();
774 a = buffer->get_byte();
778 d = buffer->get_byte(); chk_next();
779 c = buffer->get_byte(); chk_next();
780 b = buffer->get_byte(); chk_next();
781 a = buffer->get_byte(); ++current_byte;
784 uint32_t result = (d << 24) | (c << 16) | (b << 8) | (a);
794 if( current_byte+8 < buffer->file_tell() ) {
795 d = buffer->get_byte();
796 c = buffer->get_byte();
797 b = buffer->get_byte();
798 a = buffer->get_byte();
799 result = (d << 24) | (c << 16) | (b << 8) | (a);
800 d = buffer->get_byte();
801 c = buffer->get_byte();
802 b = buffer->get_byte();
803 a = buffer->get_byte();
807 d = buffer->get_byte(); chk_next();
808 c = buffer->get_byte(); chk_next();
809 b = buffer->get_byte(); chk_next();
810 a = buffer->get_byte(); chk_next();
811 result = (d << 24) | (c << 16) | (b << 8) | (a);
812 d = buffer->get_byte(); chk_next();
813 c = buffer->get_byte(); chk_next();
814 b = buffer->get_byte(); chk_next();
815 a = buffer->get_byte(); ++current_byte;
817 result = (result <<32 ) | (d << 24) | (c << 16) | (b << 8) | (a);
825 total_bytes = (buffer->access_type & io_SEQUENTIAL) ?
826 INT64_MAX : path_total_bytes(path);
833 /* Need to perform authentication before reading a single byte. */
834 //zmsgs("1 %s\n", path);
837 if( buffer->open_file(path) ) return 1;
838 if( !get_total_bytes() ) {
839 buffer->close_file();
853 buffer->close_file();
859 read_data(uint8_t *bfr, int64_t len)
861 int64_t n, count = 0;
862 //zmsgs("1 %d\n", len);
863 int result = enter();
865 n = buffer->read_out(&bfr[count],len-count);
866 current_byte += n; count += n;
867 } while( n > 0 && count < len && !(result=sync()) );
870 //zmsgs("100 %d %d\n", result, count);
871 return (result && count < len) ? 1 : 0;
877 //zmsgs("1 %jd\n", byte);
879 int result = (current_byte < 0) || (current_byte > total_bytes);
884 seek_relative(int64_t bytes)
886 //zmsgs("1 %jd\n", bytes);
887 current_byte += bytes;
888 int result = (current_byte < 0) || (current_byte > total_bytes);
892 void zfs_t::restart()
894 if( !buffer ) return;
895 if( !(buffer->access_type & io_SINGLE_ACCESS) ) return;
896 if( (buffer->access_type & io_THREADED) )
897 buffer->do_restart = 1;
905 if( !buffer ) return 1;
911 start_record(int bsz)
913 if( !buffer ) return -1;
914 buffer->write_align(bsz);
915 return buffer->start_record();
921 return buffer ? buffer->stop_record() : -1;
927 if( !buffer || --buffer->ref_count > 0 ) return;
928 delete buffer; buffer = 0;