test_cdma_rx.py

「test_cdma_rx.py」の編集履歴(バックアップ)一覧に戻る

test_cdma_rx.py - (2011/06/29 (水) 02:27:46) のソース

 #!/usr/bin/env python
 
 from gnuradio import gr, gru, modulation_utils, blks2, howto
 from gnuradio import usrp
 from gnuradio import eng_notation
 from gnuradio.eng_option import eng_option
 from optparse import OptionParser
 from pick_bitrate import pick_rx_bitrate
 import spreading_dbpsk
 import usrp_options
 import random
 import struct
 import sys
 
 # from current dir
 import usrp_receive_path
 
 #import os
 #print os.getpid()
 #raw_input('Attach and press enter: ')
 
 class my_top_block(gr.top_block):
     def __init__(self, demodulator, options):
         gr.top_block.__init__(self)
 
         if options.rx_freq is None:
             sys.stderr.write("-f FREQ or --freq FREQ or --rx-freq FREQ must be specified\n")
             raise SystemExit
         """
         self.rx_path = receive_path.receive_path(demodulator, options)
         for attr in dir(self.rx_path): #forward the methods
             if not attr.startswith('_') and not hasattr(self, attr):
                  setattr(self, attr, getattr(self.rx_path, attr))
         """
         #setup usrp
         self._demod_class = demodulator
         self._setup_usrp_source(options)

         #####printf#####
         self.uchar2float = gr.uchar_to_float()
         self.prt = howto.print_ff()
         self.float2uchar = gr.float_to_uchar()
         #self.prt = howto.print_cc()
         ################
         
         # make demodulator
         demod_kwargs = self._demod_class.extract_kwargs_from_options(options)
         self.demodulator = self._demod_class(**demod_kwargs) 
 
         self._verbose            = options.verbose
         self._bitrate            = options.bitrate         # desired bit rate
         self._samples_per_symbol = options.samples_per_symbol  # desired samples/symbol
         # Design filter to get actual channel we want
         sw_decim = 1
         chan_coeffs = gr.firdes.low_pass (1.0,                  # gain
                                           sw_decim * self._samples_per_symbol, # sampling rate
                                           1.0,                  # midpoint of trans. band
                                           0.5,                  # width of trans. band
                                           gr.firdes.WIN_HANN)   # filter type
         self.channel_filter = gr.fft_filter_ccc(sw_decim, chan_coeffs)
         
         # Carrier Sensing Blocks
         alpha = 0.001
         thresh = 30   # in dB, will have to adjust
         self.probe = gr.probe_avg_mag_sqrd_c(thresh,alpha)
 
         #create sink
         self.sink = gr.vector_sink_b()
         
         # Display some information about the setup
         if self._verbose:
             self._print_verbage() 
 
         # connect block input to channel filter
         self.connect(self.u, self.channel_filter)
         # connect the channel input filter to the carrier power detector
         self.connect(self.channel_filter, self.probe)
         # connect channel filter to descrambler
         self.connect(self.channel_filter, self.demodulator)
         self.connect(self.demodulator, self.uchar2float)
         self.connect(self.uchar2float, self.prt)
         self.connect(self.prt, self.float2uchar)
         #self.connect(self.demodulator, self.sink)
         self.connect(self.float2uchar, self.sink)
        
     def _setup_usrp_source(self, options):
             self.u = usrp_options.create_usrp_source(options)
             adc_rate = self.u.adc_rate()
             if options.verbose:
                 print 'USRP Source:', self.u
             (self._bitrate, self._samples_per_symbol, self._decim) = \
                             pick_rx_bitrate(options.bitrate, self._demod_class.bits_per_symbol(), \
                                             options.samples_per_symbol, options.decim, adc_rate,  \
                                             self.u.get_decim_rates())
     
             self.u.set_decim(self._decim)
     
             if not self.u.set_center_freq(options.rx_freq):
                 print "Failed to set Rx frequency to %s" % (eng_notation.num_to_str(options.rx_freq))
                 raise ValueError, eng_notation.num_to_str(options.rx_freq)
     def bitrate(self):
         return self._bitrate 
 
     def samples_per_symbol(self):
         return self._samples_per_symbol
 
     def carrier_sensed(self):
         """
         Return True if we think carrier is present.
         """
         #return self.probe.level() > X
         return self.probe.unmuted()
 
     def carrier_threshold(self):
         """
         Return current setting in dB.
         """
         return self.probe.threshold()
 
     def set_carrier_threshold(self, threshold_in_db):
         """
         Set carrier threshold. 
 
         @param threshold_in_db: set detection threshold
         @type threshold_in_db:  float (dB)
         """
         self.probe.set_threshold(threshold_in_db)   
         
     def add_options(normal, expert):
         """
         Adds receiver-specific options to the Options Parser
         """
         if not normal.has_option("--bitrate"):
             normal.add_option("-r", "--bitrate", type="eng_float", default=100e3,
                               help="specify bitrate [default=%default].")
         normal.add_option("-v", "--verbose", action="store_true", default=False)
         expert.add_option("-S", "--samples-per-symbol", type="int", default=2,
                           help="set samples/symbol [default=%default]")
         expert.add_option("", "--log", action="store_true", default=False,
                           help="Log all parts of flow graph to files (CAUTION: lots of data)") 
 
     # Make a static method to call before instantiation
     add_options = staticmethod(add_options)
  
 
     def _print_verbage(self):
         """
         Prints information about the receive path
         """
         print "\nReceive Path:"
         print "modulation:      %s"    % (self._demod_class.__name__)
         print "bitrate:         %sb/s" % (eng_notation.num_to_str(self._bitrate))
         print "samples/symbol:  %3d"   % (self._samples_per_symbol) 
 
 # /////////////////////////////////////////////////////////////////////////////
 #                                   main
 # ///////////////////////////////////////////////////////////////////////////// 
 
 global n_rcvd, n_right
 
 def main():
     global n_rcvd, n_right
     
     n_rcvd = 0
     n_right = 0
     """    
     def rx_callback(ok, payload):
         global n_rcvd, n_right
         (pktno,) = struct.unpack('!H', payload[0:2])
         n_rcvd += 1
         if ok:
             n_right += 1
 
         print "ok = %5s  pktno = %4d  n_rcvd = %4d  n_right = %4d" % (
             ok, pktno, n_rcvd, n_right)
     """    
     demods = modulation_utils.type_1_demods() 
 
     # Create Options Parser:
     parser = OptionParser (option_class=eng_option, conflict_handler="resolve")
     expert_grp = parser.add_option_group("Expert") 
 
     parser.add_option("-m", "--modulation", type="choice", choices=demods.keys(), 
                       default='dbpsk', 
                       help="Select modulation from: %s [default=%%default]"
                             % (', '.join(demods.keys()),))
 
     usrp_receive_path.add_options(parser, expert_grp)
 
     for mod in demods.values():
         mod.add_options(expert_grp) 
 
     (options, args) = parser.parse_args () 
 
     if len(args) != 0:
         parser.print_help(sys.stderr)
         sys.exit(1)
 
     if options.rx_freq is None:
         sys.stderr.write("You must specify -f FREQ or --freq FREQ\n")
         parser.print_help(sys.stderr)
         sys.exit(1)
  
     # build the graph
     tb = my_top_block(demods[options.modulation], options) 
 
     r = gr.enable_realtime_scheduling()
     if r != gr.RT_OK:
         print "Warning: Failed to enable realtime scheduling."
 
     tb.start()        # start flow graph
     tb.wait()         # wait for it to finish 
 
 if __name__ == '__main__':
     try:
         main()
     except KeyboardInterrupt:
         pass
----