| #!/usr/bin/env python |
| ################################################## |
| # Gnuradio Python Flow Graph |
| # Title: Airprobe File |
| # Generated: Sat Feb 7 19:30:42 2015 |
| ################################################## |
| |
| from gnuradio import blocks |
| from gnuradio import eng_notation |
| from gnuradio import gr |
| from gnuradio.eng_option import eng_option |
| from gnuradio.filter import firdes |
| from optparse import OptionParser |
| import grgsm |
| import pmt |
| |
| class airprobe_file(gr.top_block): |
| |
| def __init__(self, input_file_name="input.cfile", fc=937e6, samp_rate=1e6): |
| gr.top_block.__init__(self, "Airprobe File") |
| |
| ################################################## |
| # Parameters |
| ################################################## |
| self.input_file_name = input_file_name |
| self.fc = fc |
| self.samp_rate = samp_rate |
| |
| ################################################## |
| # Variables |
| ################################################## |
| self.SDCCH = SDCCH = 6 |
| self.RACH = RACH = 3 |
| self.PCH = PCH = 5 |
| self.CHANNEL_UNKNOWN = CHANNEL_UNKNOWN = 0 |
| self.CCCH = CCCH = 2 |
| self.BCCH = BCCH = 1 |
| self.AGCH = AGCH = 4 |
| |
| ################################################## |
| # Blocks |
| ################################################## |
| self.gsm_universal_ctrl_chans_demapper_0 = grgsm.universal_ctrl_chans_demapper(0, ([2,6,12,16,22,26,32,36,42,46]), ([BCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH,CCCH])) |
| self.gsm_receiver_0 = grgsm.receiver(4, ([0]), ([])) |
| self.gsm_message_printer_0 = grgsm.message_printer(pmt.intern("")) |
| self.gsm_input_0 = grgsm.gsm_input( |
| ppm=0, |
| osr=4, |
| fc=fc, |
| samp_rate_in=samp_rate, |
| ) |
| self.gsm_control_channels_decoder_0 = grgsm.control_channels_decoder() |
| self.gsm_clock_offset_control_0 = grgsm.clock_offset_control(fc) |
| self.blocks_socket_pdu_0 = blocks.socket_pdu("UDP_CLIENT", "127.0.0.1", "4729", 10000) |
| self.blocks_file_source_0 = blocks.file_source(gr.sizeof_gr_complex*1, input_file_name, False) |
| |
| ################################################## |
| # Connections |
| ################################################## |
| self.msg_connect(self.gsm_clock_offset_control_0, 'ppm', self.gsm_input_0, 'ppm_in') |
| self.msg_connect(self.gsm_control_channels_decoder_0, 'msgs', self.blocks_socket_pdu_0, 'pdus') |
| self.msg_connect(self.gsm_control_channels_decoder_0, 'msgs', self.gsm_message_printer_0, 'msgs') |
| self.msg_connect(self.gsm_receiver_0, 'measurements', self.gsm_clock_offset_control_0, 'measurements') |
| self.msg_connect(self.gsm_receiver_0, 'C0', self.gsm_universal_ctrl_chans_demapper_0, 'bursts') |
| self.msg_connect(self.gsm_universal_ctrl_chans_demapper_0, 'bursts', self.gsm_control_channels_decoder_0, 'bursts') |
| self.connect((self.blocks_file_source_0, 0), (self.gsm_input_0, 0)) |
| self.connect((self.gsm_input_0, 0), (self.gsm_receiver_0, 0)) |
| |
| |
| def get_input_file_name(self): |
| return self.input_file_name |
| |
| def set_input_file_name(self, input_file_name): |
| self.input_file_name = input_file_name |
| self.blocks_file_source_0.open(self.input_file_name, False) |
| |
| def get_fc(self): |
| return self.fc |
| |
| def set_fc(self, fc): |
| self.fc = fc |
| self.gsm_input_0.set_fc(self.fc) |
| |
| def get_samp_rate(self): |
| return self.samp_rate |
| |
| def set_samp_rate(self, samp_rate): |
| self.samp_rate = samp_rate |
| self.gsm_input_0.set_samp_rate_in(self.samp_rate) |
| |
| def get_SDCCH(self): |
| return self.SDCCH |
| |
| def set_SDCCH(self, SDCCH): |
| self.SDCCH = SDCCH |
| |
| def get_RACH(self): |
| return self.RACH |
| |
| def set_RACH(self, RACH): |
| self.RACH = RACH |
| |
| def get_PCH(self): |
| return self.PCH |
| |
| def set_PCH(self, PCH): |
| self.PCH = PCH |
| |
| def get_CHANNEL_UNKNOWN(self): |
| return self.CHANNEL_UNKNOWN |
| |
| def set_CHANNEL_UNKNOWN(self, CHANNEL_UNKNOWN): |
| self.CHANNEL_UNKNOWN = CHANNEL_UNKNOWN |
| |
| def get_CCCH(self): |
| return self.CCCH |
| |
| def set_CCCH(self, CCCH): |
| self.CCCH = CCCH |
| |
| def get_BCCH(self): |
| return self.BCCH |
| |
| def set_BCCH(self, BCCH): |
| self.BCCH = BCCH |
| |
| def get_AGCH(self): |
| return self.AGCH |
| |
| def set_AGCH(self, AGCH): |
| self.AGCH = AGCH |
| |
| if __name__ == '__main__': |
| parser = OptionParser(option_class=eng_option, usage="%prog: [options]") |
| parser.add_option("-i", "--input-file-name", dest="input_file_name", type="string", default="input.cfile", |
| help="Set input.cfile [default=%default]") |
| parser.add_option("-f", "--fc", dest="fc", type="eng_float", default=eng_notation.num_to_str(937e6), |
| help="Set fc [default=%default]") |
| parser.add_option("-s", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(1e6), |
| help="Set samp_rate [default=%default]") |
| (options, args) = parser.parse_args() |
| tb = airprobe_file(input_file_name=options.input_file_name, fc=options.fc, samp_rate=options.samp_rate) |
| tb.start() |
| tb.wait() |