4 typedef zmpeg3_t::zloc_t zzloc_t;
9 return syscall(SYS_sched_yield);
15 return syscall(SYS_gettid);
22 while( (ret=zfutex(FUTEX_WAKE, nwakeups)) < 0 );
29 return zfutex(FUTEX_WAIT, val);
35 fprintf(stderr,"unlocking and not locked\n");
42 if( v || zxchg(1,loc) >= 0 ) do {
44 } while ( zxchg(1,loc) >= 0 );
58 zdecr(loc); lk.lock();
59 zincr(loc); lk.unlock();
73 if( r < 0 ) zdecr(loc);
74 int v; while( (v=loc) >= 0 ) zwait(v);
80 if( r < 0 ) zincr(loc);
90 while( blocking ) { unlock(); lk.lock(); lk.unlock(); lock(); }
99 if( !--users && blocking ) wake();
107 blocking = pthread_self();
110 while( users ) { unlock(); wait(); lock(); }
118 blocking = 0; lk.unlock();
124 buffer_t(zmpeg3_t *zsrc, int access)
127 access_type = access;
128 alloc = !(access_type & io_SEQUENTIAL) ? IO_SIZE : SEQ_IO_SIZE;
134 owner = pthread_self();
140 if( (access_type & io_THREADED) )
142 if( data ) delete [] data;
163 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,0);
164 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
166 while( !reader_done ) {
172 if( size+MAX_IO_SIZE > alloc )
173 size = alloc - MAX_IO_SIZE;
175 int64_t count = read_in(zmpeg3_t::MAX_IO_SIZE);
180 if( (size+=count) > alloc )
182 if( src->recd_fd >= 0 && the_writer )
183 write_lock.unblock();
192 reader(void *the_buffer)
194 buffer_t *b = (buffer_t *)the_buffer;
202 const int mx_blksz = 0x100000;
204 while( !writer_done ) {
205 write_record(mx_blksz,0xfff);
206 if( file_pos - write_pos < mx_blksz )
212 write_record(INT_MAX, 0);
217 writer(void *the_buffer)
219 buffer_t *b = (buffer_t *)the_buffer;
225 open_file(char *path)
227 //zmsgs("1 %s\n", path);
229 if( (access_type & io_THREADED) )
230 access_type |= io_SINGLE_ACCESS + io_SEQUENTIAL;
231 if( !(access_type & io_UNBUFFERED) ) {
233 if( !(fp=::fopen(path, "rb")) ) {
241 int mode = (access_type & io_NONBLOCK) ? O_RDONLY+O_NONBLOCK : O_RDONLY;
242 if( (fd=::open(path, mode)) < 0 ) {
248 if( (access_type & io_THREADED) )
260 in = fin = out = size = 0;
261 file_pos = out_pos = file_nudge = 0;
269 if( !(access_type & io_SEQUENTIAL) ) {
270 if( !(access_type & io_UNBUFFERED) ) {
271 if( ::fseek(fp,0,SEEK_SET) < 0 )
272 perrs("fseek %jd", file_pos);
275 if( ::lseek(fd,0,SEEK_SET) < 0 )
276 perrs("lseek %jd", file_pos);
287 pthread_create(&the_reader,0,reader,this);
293 if( reader_done || !the_reader ) return;
297 while( reader_done >= 0 && --tmo >= 0 ) usleep(100000);
298 if( tmo < 0 ) pthread_cancel(the_reader);
307 pthread_create(&the_writer,0,writer,this);
314 if( writer_done || !the_writer ) return 1;
316 write_lock.unblock();
317 pthread_join(the_writer,0);
319 while( writer_done >= 0 && --tmo >= 0 ) usleep(100000);
320 if( tmo < 0 ) pthread_cancel(the_writer);
328 int isz = 2*src->packet_size;
329 if( sz > isz ) isz = sz;
332 if( src->is_program_stream() ) {
333 static uint8_t pack_start[4] = { 0x00, 0x00, 0x01, 0xba };
334 pat = pack_start; psz = sizeof(pack_start);
336 else if( src->is_transport_stream() ) {
337 static uint8_t sync_start[1] = { 0x47 };
338 pat = sync_start; psz = sizeof(sync_start);
340 if( isz > out_pos ) isz = out_pos;
341 int64_t bsz = file_pos - out_pos;
342 int i = bsz>alloc ? 0 : alloc-bsz;
343 if( isz > i ) isz = i;
345 if( i < 0 ) i += alloc;
346 uint8_t *bfr = &data[i];
347 uint8_t *lmt = &data[alloc];
348 int64_t len = isz + bsz;
351 while( --len >= 0 ) {
352 if( *bfr++ != pat[i] ) { i = 0; }
353 else if( ++i >= psz ) break;
354 if( bfr >= lmt ) bfr = &data[0];
357 if( len < 0 ) return 1;
359 if( (bfr-=psz) < &data[0] ) bfr += alloc;
361 write_pos = file_pos - len;
365 // write blocks of (mask+1) bytes of data at data+wout, update wout
366 // only write full blocks, fragments cause disk thrashing
368 write_record(int sz, int mask)
370 int isz = file_pos - write_pos;
371 if( isz > sz ) isz = sz;
372 if( !(isz &= ~mask) ) return;
373 //zmsgs(" isz=%d, file_pos=%ld, write_pos=%ld\n", isz, file_pos, write_pos);
376 int n = alloc - wout;
377 if( n > len ) n = len;
379 src->write_record(&data[wout],n);
383 src->write_record(&data[0],wout=len);
391 if( open_count > 0 && --open_count == 0 ) {
392 if( (access_type & io_THREADED) )
394 if( !src || !src->iopened ) {
395 if( !(access_type & io_UNBUFFERED) ) {
396 if( fp != 0 ) { fclose(fp); fp = 0; }
399 if( fd >= 0 ) { close(fd); fd = -1; }
405 /* sole user of in ptr (except restart) */
410 while( count < len ) {
411 int xfr = len - count;
412 int avl = alloc - in;
413 if( avl < xfr ) xfr = avl;
414 if( access_type & io_UNBUFFERED ) {
415 if( access_type & io_NONBLOCK ) {
417 for( int i=10; avl<0 && --i>=0; ) { // 10 retries, 2 seconds
418 struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 200000;
419 fd_set rd_fd; FD_ZERO(&rd_fd); FD_SET(fd, &rd_fd);
420 int ret = select(fd+1, &rd_fd, 0, 0, &tv);
423 if( !FD_ISSET(fd, &rd_fd) ) continue;
424 if( (ret=::read(fd, &data[in], xfr)) > 0 ) avl = ret;
428 avl = ::read(fd, &data[in], xfr);
431 avl = ::fread(&data[in], 1, xfr, fp);
432 if( !avl && ferror(fp) ) avl = -1;
435 if( errno == EOVERFLOW ) {
440 if( !(access_type & io_ERRFAIL) ) {
441 if( errs < IO_ERR_LIMIT && xfr > ERR_PACKET_SIZE )
442 xfr = ERR_PACKET_SIZE;
443 memset(&data[in],0,xfr);
444 if( !(access_type & io_THREADED) ) {
445 int64_t pos = file_pos + count + xfr;
446 if( (access_type & io_UNBUFFERED) ) {
447 if( ::lseek(fd,pos,SEEK_SET) < 0 )
448 perrs("lseek pos %jx",pos);
451 if( ::fseek(fp,pos,SEEK_SET) < 0 )
452 perrs("fseek pos %jx",pos);
458 perrs("read pos %jx",file_pos + count);
464 if( avl == 0 ) break;
465 if( paused ) continue;
468 if( (avl=in-alloc) >= 0 ) in = avl;
477 if( (access_type & io_THREADED) && !restarted ) {
478 while( !reader_done && !restarted && file_pos <= pos ) {
484 if( reader_done || restarted ) result = 1;
493 if( file_pos != pos ) {
494 if( !(access_type & io_SEQUENTIAL) ) {
495 result = (access_type & io_UNBUFFERED) ?
496 (lseek64(fd, pos, SEEK_SET) >= 0 ? 0 : 1) :
497 (fseeko(fp, pos, SEEK_SET) == 0 ? 0 : 1) ;
499 else if( pos != 0 ) {
500 if( (access_type & io_THREADED) )
502 zerrs("seek on sequential from %jd to %jd\n", file_pos, pos);
514 read_fin(int64_t len)
516 int64_t count = read_in(len);
517 /* must already be locked */
520 if( (size+=count) > alloc )
522 return len && count ? 0 : 1;
526 seek_to(int64_t pos, int64_t len)
528 //zmsgs(" pos=%ld, len=%ld\n", pos, len);
529 int result = seek_in(pos);
531 file_pos = out_pos = pos;
533 result = read_fin(len);
541 //zmsgs(" pos=%ld\n", pos);
543 int64_t len = pos - file_pos;
545 zerrs("reversed seq read (%jd < %jd)\n", pos, file_pos);
549 result = read_fin(len);
558 int64_t start_pos = file_pos - size;
559 if( pos < start_pos ) { /* before buffer */
560 if( (access_type & io_THREADED) ) {
562 zerrs("threaded sync before buffer %jd < %jd\n", pos, start_pos);
563 int64_t mid_pos = start_pos + size/2;
564 file_nudge += mid_pos - pos;
568 restart(0); /* allow fake seek to start */
576 result = pos < file_pos ? 0 : /* in buffer */
577 (access_type & io_THREADED) ?
578 wait_in(pos) : -1; /* after buffer */
579 if( result < 0 ) { /* out of buffer */
580 //zmsgs("out of buffer pos=%ld, start_pos=%ld, file_pos=%ld, out_pos=%ld\n",
581 // pos, start_pos, file_pos, out_pos);
582 int64_t seek_pos = pos - alloc/2;
583 int64_t end_pos = seek_pos + alloc;
584 if( seek_pos < 0 ) seek_pos = 0;
585 result = (access_type & io_SEQUENTIAL) ||
586 (seek_pos < file_pos && end_pos >= file_pos) ?
587 read_to(end_pos) : seek_to(seek_pos, alloc);
590 int64_t offset = file_pos - pos;
591 if( offset >= 0 && offset <= size ) {
592 if( (offset=fin-offset) < 0 ) offset += alloc;
593 out = offset; out_pos = pos;
602 read_out(uint8_t *bfr,int len)
605 while( count < len ) {
606 int avail = file_pos - out_pos;
607 if( avail <= 0 ) break;
608 int fragment_size = alloc - out;
609 if( fragment_size > avail ) fragment_size = avail;
611 if( fragment_size > avail ) fragment_size = avail;
612 memcpy(bfr, &data[out], fragment_size);
613 bfr += fragment_size;
614 out += fragment_size;
615 if( out >= alloc ) out = 0;
616 out_pos += fragment_size;
617 count += fragment_size;
622 static int fd_name(int fd,char *nm,int sz)
625 snprintf(&pfn[0],sizeof(pfn),"/proc/self/fd/%d",fd);
626 return readlink(&pfn[0],nm,sz);
630 fs_t(zmpeg3_t *zsrc, const char *fpath, int access)
634 buffer = new buffer_t(src, access);
635 if( (access & io_SINGLE_ACCESS) )
640 fs_t(zmpeg3_t *zsrc, int fd, int access)
643 access |= io_UNBUFFERED;
644 access |= io_SINGLE_ACCESS;
645 if( (access & io_THREADED) ) access |= io_SEQUENTIAL;
646 buffer = new buffer_t(src, access);
647 if( !fd_name(fd,&path[0],sizeof(path)) )
648 css.get_keys(&path[0]);
651 buffer->open_count = 1;
653 if( (buffer->access_type & io_THREADED) )
654 buffer->start_reader();
658 fs_t(zmpeg3_t *zsrc, FILE *fp, int access)
661 access &= ~io_UNBUFFERED;
662 access |= io_SINGLE_ACCESS;
663 if( (access & io_THREADED) ) access |= io_SEQUENTIAL;
664 buffer = new buffer_t(src, access);
666 if( !fd_name(fd,&path[0],sizeof(path)) )
667 css.get_keys(&path[0]);
670 buffer->open_count = 1;
672 if( (buffer->access_type & io_THREADED) )
673 buffer->start_reader();
687 strcpy(path, fs.path);
688 total_bytes = fs.total_bytes;
690 if( !fs.buffer ) return;
691 int access = fs.buffer->access_type;
692 if( (access & io_SINGLE_ACCESS) ) {
695 is_open = fs.is_open;
697 ++buffer->open_count;
700 buffer = new buffer_t(src, access);
707 uint32_t ret = buffer->next_byte();
716 uint32_t result = buffer->get_byte();
727 if( current_byte+2 < buffer->file_tell() ) {
728 b = buffer->get_byte();
729 a = buffer->get_byte();
733 b = buffer->get_byte(); chk_next();
734 a = buffer->get_byte(); ++current_byte;
737 uint32_t result = (b << 8) | (a);
746 if( current_byte+3 < buffer->file_tell() ) {
747 c = buffer->get_byte();
748 b = buffer->get_byte();
749 a = buffer->get_byte();
753 c = buffer->get_byte(); chk_next();
754 b = buffer->get_byte(); chk_next();
755 a = buffer->get_byte(); ++current_byte;
758 uint32_t result = (c << 16) | (b << 8) | (a);
767 if( current_byte+4 < buffer->file_tell() ) {
768 d = buffer->get_byte();
769 c = buffer->get_byte();
770 b = buffer->get_byte();
771 a = buffer->get_byte();
775 d = buffer->get_byte(); chk_next();
776 c = buffer->get_byte(); chk_next();
777 b = buffer->get_byte(); chk_next();
778 a = buffer->get_byte(); ++current_byte;
781 uint32_t result = (d << 24) | (c << 16) | (b << 8) | (a);
791 if( current_byte+8 < buffer->file_tell() ) {
792 d = buffer->get_byte();
793 c = buffer->get_byte();
794 b = buffer->get_byte();
795 a = buffer->get_byte();
796 result = (d << 24) | (c << 16) | (b << 8) | (a);
797 d = buffer->get_byte();
798 c = buffer->get_byte();
799 b = buffer->get_byte();
800 a = buffer->get_byte();
804 d = buffer->get_byte(); chk_next();
805 c = buffer->get_byte(); chk_next();
806 b = buffer->get_byte(); chk_next();
807 a = buffer->get_byte(); chk_next();
808 result = (d << 24) | (c << 16) | (b << 8) | (a);
809 d = buffer->get_byte(); chk_next();
810 c = buffer->get_byte(); chk_next();
811 b = buffer->get_byte(); chk_next();
812 a = buffer->get_byte(); ++current_byte;
814 result = (result <<32 ) | (d << 24) | (c << 16) | (b << 8) | (a);
822 total_bytes = (buffer->access_type & io_SEQUENTIAL) ?
823 INT64_MAX : path_total_bytes(path);
830 /* Need to perform authentication before reading a single byte. */
831 //zmsgs("1 %s\n", path);
834 if( buffer->open_file(path) ) return 1;
835 if( !get_total_bytes() ) {
836 buffer->close_file();
850 buffer->close_file();
856 read_data(uint8_t *bfr, int64_t len)
858 int64_t n, count = 0;
859 //zmsgs("1 %d\n", len);
860 int result = enter();
862 n = buffer->read_out(&bfr[count],len-count);
863 current_byte += n; count += n;
864 } while( n > 0 && count < len && !(result=sync()) );
867 //zmsgs("100 %d %d\n", result, count);
868 return (result && count < len) ? 1 : 0;
874 //zmsgs("1 %jd\n", byte);
876 int result = (current_byte < 0) || (current_byte > total_bytes);
881 seek_relative(int64_t bytes)
883 //zmsgs("1 %jd\n", bytes);
884 current_byte += bytes;
885 int result = (current_byte < 0) || (current_byte > total_bytes);
889 void zfs_t::restart()
891 if( !buffer ) return;
892 if( !(buffer->access_type & io_SINGLE_ACCESS) ) return;
893 if( (buffer->access_type & io_THREADED) )
894 buffer->do_restart = 1;
902 if( !buffer ) return 1;
908 start_record(int bsz)
910 if( !buffer ) return -1;
911 buffer->write_align(bsz);
912 return buffer->start_record();
918 return buffer ? buffer->stop_record() : -1;
924 if( !buffer || --buffer->ref_count > 0 ) return;
925 delete buffer; buffer = 0;