test_cdma_rx.py

#!/usr/bin/env python

from gnuradio import gr, gru, modulation_utils, blks2, howto
from gnuradio import usrp
from gnuradio import eng_notation
from gnuradio.eng_option import eng_option
from optparse import OptionParser
from pick_bitrate import pick_rx_bitrate
import spreading_dbpsk
import usrp_options
import random
import struct
import sys

# from current dir
import usrp_receive_path

#import os
#print os.getpid()
#raw_input('Attach and press enter: ')

class my_top_block(gr.top_block):
    def __init__(self, demodulator, options):
        gr.top_block.__init__(self)

        if options.rx_freq is None:
            sys.stderr.write("-f FREQ or --freq FREQ or --rx-freq FREQ must be specified\n")
            raise SystemExit
        """
        self.rx_path = receive_path.receive_path(demodulator, options)
        for attr in dir(self.rx_path): #forward the methods
            if not attr.startswith('_') and not hasattr(self, attr):
                 setattr(self, attr, getattr(self.rx_path, attr))
        """
        #setup usrp
        self._demod_class = demodulator
        self._setup_usrp_source(options)

        #####printf#####
        self.uchar2float = gr.uchar_to_float()
        self.prt = howto.print_ff()
        self.float2uchar = gr.float_to_uchar()
        #self.prt = howto.print_cc()
        ################
        
        # make demodulator
        demod_kwargs = self._demod_class.extract_kwargs_from_options(options)
        self.demodulator = self._demod_class(**demod_kwargs) 

        self._verbose            = options.verbose
        self._bitrate            = options.bitrate         # desired bit rate
        self._samples_per_symbol = options.samples_per_symbol  # desired samples/symbol
        # Design filter to get actual channel we want
        sw_decim = 1
        chan_coeffs = gr.firdes.low_pass (1.0,                  # gain
                                          sw_decim * self._samples_per_symbol, # sampling rate
                                          1.0,                  # midpoint of trans. band
                                          0.5,                  # width of trans. band
                                          gr.firdes.WIN_HANN)   # filter type
        self.channel_filter = gr.fft_filter_ccc(sw_decim, chan_coeffs)
        
        # Carrier Sensing Blocks
        alpha = 0.001
        thresh = 30   # in dB, will have to adjust
        self.probe = gr.probe_avg_mag_sqrd_c(thresh,alpha)

        #create sink
        self.sink = gr.vector_sink_b()
        
        # Display some information about the setup
        if self._verbose:
            self._print_verbage() 

        # connect block input to channel filter
        self.connect(self.u, self.channel_filter)
        # connect the channel input filter to the carrier power detector
        self.connect(self.channel_filter, self.probe)
        # connect channel filter to descrambler
        self.connect(self.channel_filter, self.demodulator)
        self.connect(self.demodulator, self.uchar2float)
        self.connect(self.uchar2float, self.prt)
        self.connect(self.prt, self.float2uchar)
        #self.connect(self.demodulator, self.sink)
        self.connect(self.float2uchar, self.sink)
       
    def _setup_usrp_source(self, options):
            self.u = usrp_options.create_usrp_source(options)
            adc_rate = self.u.adc_rate()
            if options.verbose:
                print 'USRP Source:', self.u
            (self._bitrate, self._samples_per_symbol, self._decim) = \
                            pick_rx_bitrate(options.bitrate, self._demod_class.bits_per_symbol(), \
                                            options.samples_per_symbol, options.decim, adc_rate,  \
                                            self.u.get_decim_rates())
    
            self.u.set_decim(self._decim)
    
            if not self.u.set_center_freq(options.rx_freq):
                print "Failed to set Rx frequency to %s" % (eng_notation.num_to_str(options.rx_freq))
                raise ValueError, eng_notation.num_to_str(options.rx_freq)
    def bitrate(self):
        return self._bitrate 

    def samples_per_symbol(self):
        return self._samples_per_symbol

    def carrier_sensed(self):
        """
        Return True if we think carrier is present.
        """
        #return self.probe.level() > X
        return self.probe.unmuted()

    def carrier_threshold(self):
        """
        Return current setting in dB.
        """
        return self.probe.threshold()

    def set_carrier_threshold(self, threshold_in_db):
        """
        Set carrier threshold. 

        @param threshold_in_db: set detection threshold
        @type threshold_in_db:  float (dB)
        """
        self.probe.set_threshold(threshold_in_db)   
        
    def add_options(normal, expert):
        """
        Adds receiver-specific options to the Options Parser
        """
        if not normal.has_option("--bitrate"):
            normal.add_option("-r", "--bitrate", type="eng_float", default=100e3,
                              help="specify bitrate [default=%default].")
        normal.add_option("-v", "--verbose", action="store_true", default=False)
        expert.add_option("-S", "--samples-per-symbol", type="int", default=2,
                          help="set samples/symbol [default=%default]")
        expert.add_option("", "--log", action="store_true", default=False,
                          help="Log all parts of flow graph to files (CAUTION: lots of data)") 

    # Make a static method to call before instantiation
    add_options = staticmethod(add_options)
 

    def _print_verbage(self):
        """
        Prints information about the receive path
        """
        print "\nReceive Path:"
        print "modulation:      %s"    % (self._demod_class.__name__)
        print "bitrate:         %sb/s" % (eng_notation.num_to_str(self._bitrate))
        print "samples/symbol:  %3d"   % (self._samples_per_symbol) 

# /////////////////////////////////////////////////////////////////////////////
#                                   main
# ///////////////////////////////////////////////////////////////////////////// 

global n_rcvd, n_right

def main():
    global n_rcvd, n_right
    
    n_rcvd = 0
    n_right = 0
    """    
    def rx_callback(ok, payload):
        global n_rcvd, n_right
        (pktno,) = struct.unpack('!H', payload[0:2])
        n_rcvd += 1
        if ok:
            n_right += 1

        print "ok = %5s  pktno = %4d  n_rcvd = %4d  n_right = %4d" % (
            ok, pktno, n_rcvd, n_right)
    """    
    demods = modulation_utils.type_1_demods() 

    # Create Options Parser:
    parser = OptionParser (option_class=eng_option, conflict_handler="resolve")
    expert_grp = parser.add_option_group("Expert") 

    parser.add_option("-m", "--modulation", type="choice", choices=demods.keys(), 
                      default='dbpsk', 
                      help="Select modulation from: %s [default=%%default]"
                            % (', '.join(demods.keys()),))

    usrp_receive_path.add_options(parser, expert_grp)

    for mod in demods.values():
        mod.add_options(expert_grp) 

    (options, args) = parser.parse_args () 

    if len(args) != 0:
        parser.print_help(sys.stderr)
        sys.exit(1)

    if options.rx_freq is None:
        sys.stderr.write("You must specify -f FREQ or --freq FREQ\n")
        parser.print_help(sys.stderr)
        sys.exit(1)
 
    # build the graph
    tb = my_top_block(demods[options.modulation], options) 

    r = gr.enable_realtime_scheduling()
    if r != gr.RT_OK:
        print "Warning: Failed to enable realtime scheduling."

    tb.start()        # start flow graph
    tb.wait()         # wait for it to finish 

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        pass

最終更新:2011年06月29日 02:27