📄 subsystem.c
字号:
} else if ( c[j] > c[1-j] ){ PR_DBG( 4, "We're running ahead, wait for the other\n" ); st[j].counter--; } else { PR_DBG( 4, "Checking whether we should add or overwrite\n" ); if ( abs( t[j] - t[1-j] ) < abs( context->time_prepare - t[1-j] ) ){ // We have definitely a 'newer' value, and thus we can // increase the counter PR_DBG( 3, "A new entry\n" ); } else { PR_DBG( 3, "New entry is more near: %li against %li" " -> overwriting\n", abs( context->time_prepare - t[1-j] ) / 100000, abs( t[j] - t[1-j] ) / 1000 ); st[j].counter--; } } } st[j].counter++; st[j].time_d[ st[j].counter ] = context->time_prepare; PR_DBG( 4, "Entry: st%i[%i] at %li is ", j, st[j].counter, context->time_prepare / 100000 ); // Writing the value switch( st[j].pt ) { case INT: d_int = ((int*)st[j].data) + st[j].counter; swr_sdb_get_stats_parameter_value( context->id, st[j].pi, d_int ); PR_DBG( 2, "Integer: %i\n", *d_int ); break; case DOUBLE: d_double = ((double*)st[j].data) + st[j].counter; swr_sdb_get_stats_parameter_value( context->id, st[j].pi, d_double ); PR_DBG( 2, "Double: %g\n", *d_double ); break; case COMPLEX: d_complex = ((SYMBOL_COMPLEX*)st[j].data) + st[j].counter; swr_sdb_get_stats_parameter_value( context->id, st[j].pi, d_complex ); PR_DBG( 2, "Complex: %i+%ii\n", d_complex->real, d_complex->imag ); break; default: PR_DBG( 1, "Accepted to track value that " "I don't know how to track\n" ); break; } } } }}void *subsys_thread( void *data );void make_thread( swr_sdb_t *context ) { int error; pthread_mutex_lock( &context->mutex ); // Check if we ain't have one already if ( IS_STATUS( THREAD ) ) { PR_DBG( 1, "Trying to re-threading a module in id %i\n", context->id ); pthread_mutex_unlock( &context->mutex ); return; } SET_STATUS( THREAD ); // Create the thread if ( ( error = swr_thread_init( &context->thread, subsys_thread, context ) ) ) { PR_DBG( 0, "Error %i while creating thread. Dangerous things might happen\n", error ); CLR_STATUS( THREAD ); pthread_mutex_unlock( &context->mutex ); return; } PR_DBG( 2, "Initialised thread for module %i\n", context->id ); pthread_mutex_unlock( &context->mutex );}void finish_thread( swr_sdb_t *context ) { PR_DBG( 2, "Finishing thread\n" ); pthread_mutex_lock( &context->mutex ); if ( IS_STATUS( THREAD ) ) { if ( IS_STATUS( WORKING ) ) { // This is not cool - end the subsystem while it is working... PR_DBG( 0, "ERROR: subsys %i is still working and you want it to quit...\n", context->id ); PR_DBG( 0, "Trying to cancel it\n" ); pthread_cancel( context->thread.thread ); PR_DBG( 0, "If you read this, we've been lucky... let's see\n" ); } CLR_STATUS( THREAD ); PR_DBG( 1, "Sending signal to exit\n" ); pthread_cond_signal( &context->cond ); pthread_mutex_unlock( &context->mutex ); swr_thread_free( &context->thread, NULL ); PR_DBG( 2, "Subsys-thread %i cancelled.\n", context->id ); }}int subsys_process( swr_sdb_t *context, int msg, void *data, swr_sdb_id ret_id ) { swr_port_t *params_out = NULL, *params_in = NULL; int ret = 0; long long int profiling_time; PR_DBG( 3, "vvv Subsys %i, from %i\n", context->id, ret_id ); if ( ( msg == SUBS_MSG_RECONFIG ) || ( msg == SUBS_MSG_RESIZE ) || ( msg == SUBS_MSG_DATA ) || ( msg == SUBS_MSG_USER ) ) { if ( context->rec_count > 1 ){ PR_DBG( 2, "Not verifying params for rec_count > 1 at %i\n", context->id ); } else { // Save the port-sizes if ( IS_STATUS( RESIZE_UP ) ) { PR_DBG( 4, "Getting input\n" ); get_params( context, INPUT, ¶ms_in ); } if ( IS_STATUS( RESIZE_DOWN ) ) { PR_DBG( 4, "Getting output\n" ); get_params( context, OUTPUT, ¶ms_out ); } } } context->profiling.calls[ profiling_total ]++; context->profiling.time[ profiling_total ] -= (long long int)get_time_usec(); switch( msg ) { case SUBS_MSG_EXIT: PR_DBG( 4, "Received message exit\n" ); finish_thread( context ); func( context->cdb_desc->fn_finalize ); break; case SUBS_MSG_CONNECT: PR_DBG( 4, "Received message connect from %i\n", ret_id ); conn_msg( context, (swr_conn_req_t*)data ); break; case SUBS_MSG_DISCONNECT: PR_DBG( 4, "Received message disconnect with %i\n", ret_id ); disconn_msg( context, (swr_conn_req_t*)data ); break; case SUBS_MSG_DATA: profiling_time = (long long int)get_time_usec(); PR_DBG( 4, "Received message data\n" ); if ( rcv_data( context, ret_id, data ) >= 0 ){ // We only profile successful data-proceedings context->profiling.calls[ profiling_data ]++; context->profiling.time[ profiling_data ] += (long long int)get_time_usec() - profiling_time; // We only update the tracks if a data-msg passed its way and // is successful if ( IS_STATUS( TRACKED ) ){ update_track( context ); } } break; case SUBS_MSG_RECONFIG: PR_DBG( 4, "Received message reconfig\n" ); reconfig( context ); break; case SUBS_MSG_RESIZE: PR_DBG( 4, "Received message resize\n" ); resize( context, ret_id, (swr_propagation_t*)data, ¶ms_out, ¶ms_in ); break; case SUBS_MSG_PING: PR_DBG( 4, "Received message ping\n" ); break; case SUBS_MSG_USER: context->profiling.calls[ profiling_msg ]++; context->profiling.time[ profiling_msg ] -= (long long int)get_time_usec(); PR_DBG( 4, "Received user-message\n" ); user_msg( context, ret_id, data ); context->profiling.time[ profiling_msg ] += (long long int)get_time_usec(); break; case SUBS_MSG_THREAD: make_thread( context ); break; case SUBS_MSG_NEW_TRACK: SET_STATUS( TRACKED ); break; case SUBS_MSG_NO_TRACK: CLR_STATUS( TRACKED ); break; case SUBS_MSG_PREPARE: ret = prepare( context, (long int*)data ); break; default: PR_DBG( 0, "%i Received stray msg: %i\n", context->id, msg ); break; } context->profiling.time[ profiling_total ] += (long long int)get_time_usec(); send_data( context ); if ( ( msg == SUBS_MSG_RECONFIG ) || ( msg == SUBS_MSG_RESIZE ) || ( msg == SUBS_MSG_DATA ) || ( msg == SUBS_MSG_USER ) ) { // And see what changed if ( params_in ){ if ( IS_STATUS( RESIZE_UP ) ) { PR_DBG( 4, "Comparing input\n" ); cmp_params( context, INPUT, params_in ); } } if ( params_out ){ if ( IS_STATUS( RESIZE_DOWN ) ) { PR_DBG( 4, "Comparing output\n" ); cmp_params( context, OUTPUT, params_out ); } } if ( !( STATUS( RESIZE_BOTH ) & context->status ) ) { PR_DBG( 4, "Anything new?\n" ); cmp_params_new( context ); } } PR_DBG( 3, "^^^ Subsys %i Returning from subsys-handler\n", context->id ); return ret;}void *subsys_thread( void *arg ) { swr_sdb_t *context = arg; // If something goes wrong, need to cancel immediately pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL ); while ( IS_STATUS( THREAD ) ) { PR_DBG( 4, "thread: Going to wait for signal\n" ); pthread_mutex_lock( &context->mutex ); CLR_STATUS( WORKING ); pthread_cond_wait( &context->cond, &context->mutex ); PR_DBG( 4, "thread: Signal arrived\n" ); pthread_mutex_unlock( &context->mutex ); PR_DBG( 4, "thread: Mutex unlocked\n" ); if ( IS_STATUS( THREAD ) ) { subsys_process( context, SUBS_MSG_DATA, context->p_data, context->p_ret_id ); PR_DBG( 4, "thread: Processed\n" ); } } // On exit of the subsystem... PR_DBG( 1, "Leaving thread of subsys-id %i\n", context->id ); return 0;}int subsys_handler( swr_sdb_t *context, int msg, void *data, swr_sdb_id ret_id ) { int ret = 0; int in, all; if ( !context ) { PR_DBG( 0, "subsys_handler without context... not good\n" ); breakpoint(); return -1; } // We have to make sure that everything is here... if ( IS_STATUS( MULTI_IN ) && ( msg == SUBS_MSG_DATA ) ){ int nbr_ports = context->cdb_desc->inputs.nbr_ports; PR_DBG( 2, "Multi-input...\n" ); if ( nbr_ports ){ swr_port_t *p = NULL; int discard = 0; // Make sure that a module with multiple inputs gets // them in a nice order, that is, first input-port // gets filled up first. all = 1; for ( in=0; in<nbr_ports; in++ ){ p = &context->port_in[in]; if ( p->sdb_id > 0 ){ if ( !( p->flags & SWR_PORT_DATA ) ){ all = 0; } else if ( !all ){ // Not all ports are filled up sequentially, so we discard discard = 1; } } } // Now, if the last port has data, but not all, then // we discard everything, because it's a nasty state... if ( discard ){ PR_DBG( 2, "Discarding all inputs\n" ); for ( in=0; in<nbr_ports; in++ ){ context->port_in[in].flags &= ~SWR_PORT_DATA; } } // OK, if not all here, we return if ( !all ){ return 0; } } else { PR_DBG( 0, "Multi-inputs without input-port\n" ); } } if ( ( msg == SUBS_MSG_DATA ) && IS_STATUS( THREAD ) ) { // Data-messages get done specially if we have a thread PR_DBG( 2, "Subsys-status: %x\n", context->status ); pthread_mutex_lock( &context->mutex ); if ( IS_STATUS( WORKING ) ) { PR_DBG( 2, "Subsys %i is already processing data.\n", context->id ); pthread_mutex_unlock( &context->mutex ); context->profiling.rework++; return -1; } SET_STATUS( WORKING ); context->p_data = data; context->p_ret_id = ret_id; PR_DBG( 4, "Going to send signal to subsys %i\n", context->id ); pthread_cond_signal( &context->cond ); PR_DBG( 4, "handler: sent signal\n" ); pthread_mutex_unlock( &context->mutex ); PR_DBG( 4, "handler: mutex unlocked\n" ); } else { // Do it the simple way if ( msg == SUBS_MSG_RECONFIG && context->rec_count && !STATUS( PREPARE ) ){ PR_DBG( 2, "Trying to reconfigure while things are happening\n" ); } else { context->rec_count++; ret = subsys_process( context, msg, data, ret_id ); context->rec_count--; } } return ret;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -