⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dbus-test

📁 bluetooth 开发应用bluez-utils-2.23
💻
字号:
#!/usr/bin/env pythonimport dbusimport dbus.decoratorsimport dbus.glibimport gobjectimport sysimport getoptfrom signal import *mgr_cmds = [ "DeviceList", "DefaultDevice" ]dev_cmds = [ "Up", "Down", "SetProperty", "GetProperty", "Inquiry",             "CancelInquiry", "PeriodicInquiry","CancelPeriodic", "RemoteName",             "Connections", "Authenticate", "RoleSwitch" ]dev_setprop_bool = [ "auth", "encrypt", "discoverable", "connectable" ]dev_setprop_byte = [ "incmode" ]class Tester:    exit_events = []    dev_path = None    need_dev = False    listen = False    at_interrupt = None    def __init__(self, argv):        self.name = argv[0]        self.parse_args(argv[1:])        try:            self.dbus_setup()        except dbus.DBusException, e:            print 'Failed to do D-BUS setup: %s' % e            sys.exit(1)        self.dev_setup()    def parse_args(self, argv):        try:            opts, args = getopt.getopt(argv, "hli:")        except getopt.GetoptError:            self.usage()            sys.exit(1)        for o, a in opts:            if o == "-h":                self.usage()                sys.exit()            elif o == "-l":                self.need_dev = True                self.listen = True            elif o == "-i":                if a[0] == '/':                    self.dev_path = a                else:                    self.dev_path = '/org/bluez/Device/%s' % a        if not (args or self.listen):            self.usage()            sys.exit(1)        if args:            self.cmd = args[0]            self.cmd_args = args[1:]            if not self.cmd in mgr_cmds:                self.need_dev = True    def dev_setup(self):        if self.need_dev and not self.dev_path:            try:                self.dev_path = self.manager.DefaultDevice()            except dbus.DBusException, e:                print 'Failed to get default device: %s' % e                sys.exit(1)        if self.dev_path:            try:                obj = self.bus.get_object('org.bluez', self.dev_path)                self.dev = dbus.Interface(obj, 'org.bluez.Device')                self.dev.connect_to_signal('Up', self.dev_up)                self.dev.connect_to_signal('Down', self.dev_down)                self.bus.add_signal_receiver(self.dev_name_changed, 'DeviceNameChanged',                                             'org.bluez.Device', 'org.bluez',                                             '/org/bluez/Device/hci0')                obj = self.bus.get_object('org.bluez', '%s/Controller' % self.dev_path)                self.ctl = dbus.Interface(obj, 'org.bluez.Device.Controller')                self.ctl.connect_to_signal('InquiryStart', self.inquiry_start)                self.ctl.connect_to_signal('InquiryResult', self.inquiry_result)                self.ctl.connect_to_signal('InquiryComplete', self.inquiry_complete)                self.ctl.connect_to_signal('RemoteName', self.remote_name)                self.ctl.connect_to_signal('RemoteNameFailed', self.remote_name_failed)                self.ctl.connect_to_signal('AuthenticationComplete', self.authentication_complete)            except dbus.DBusException, e:                print 'Failed to setup device path: %s' % e                sys.exit(1)    def dbus_setup(self):        self.bus = dbus.SystemBus()        manager_obj = self.bus.get_object('org.bluez', '/org/bluez/Manager')        self.manager = dbus.Interface(manager_obj, 'org.bluez.Manager')        self.manager.connect_to_signal('DeviceAdded', self.device_added)        self.manager.connect_to_signal('DeviceRemoved', self.device_removed)    def usage(self):        print 'Usage: %s [-i <dev>] [-l] [-h] <cmd> [arg1..]' % self.name        print '  -i <dev>   Specify device (e.g. "hci0" or "/org/bluez/Device/hci0")'        print '  -l         Listen for events (no command required)'        print '  -h         Show this help'        print 'Manager commands:'        for cmd in mgr_cmds:            print '\t%s' % cmd        print 'Device commands:'        for cmd in dev_cmds:            print '\t%s' % cmd    def device_added(self, path):        print 'DeviceAdded: %s' % path    def device_removed(self, path):        print 'DeviceRemoved: %s' % path    def remote_name(self, bda, name):        print 'RemoteName: %s, %s' % (bda, name)        if 'RemoteName' in self.exit_events:            self.main_loop.quit()    def remote_name_failed(self, bda, status):        print 'RemoteNameFailed: %s, 0x%02X' % (bda, status)        if 'RemoteNameFailed' in self.exit_events:            self.main_loop.quit()    def inquiry_start(self):        print 'InquiryStart'    def inquiry_complete(self):        print 'InquiryComplete'        if 'InquiryComplete' in self.exit_events:            self.main_loop.quit()    def inquiry_result(self, bda, cls, rssi):        print 'InquiryResult: %s, %06X, %02X' % (bda, cls, rssi)    def authentication_complete(self, bda, status):        print 'AuthenticationComplete: %s, 0x%02X' % (bda, status)        if 'AuthenticationComplete' in self.exit_events:            self.main_loop.quit()    def dev_up(self):        print 'Up'    def dev_down(self):        print 'Down'    @dbus.decorators.explicitly_pass_message    def dev_name_changed(*args, **keywords):        name = args[1]        dbus_message = keywords["dbus_message"]        print 'Device %s name changed: %s' % (dbus_message.get_path(), name)    def signal_cb(self, sig, frame):        print 'Caught signal, exiting'        if self.at_interrupt:            self.at_interrupt()        self.main_loop.quit()    def run(self):        # Manager methods        if self.listen:            print 'Listening for events...'        elif self.cmd == 'DeviceList':            for dev in self.manager.DeviceList():                print dev        elif self.cmd == 'DefaultDevice':            print self.manager.DefaultDevice()        # Device methods        elif self.cmd == 'Up':            try:                self.dev.Up()            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)        elif self.cmd == 'Down':            try:                self.dev.Down()            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)        elif self.cmd == 'SetProperty':            if len(self.cmd_args) < 2:                print 'Usage: %s -i <dev> SetProperty strPropName arg' % self.name                sys.exit(1)            try:                if self.cmd_args[0].lower() in dev_setprop_bool:                    self.dev.SetProperty(self.cmd_args[0], dbus.Boolean(self.cmd_args[1]))                elif self.cmd_args[0].lower() in dev_setprop_byte:                    self.dev.SetProperty(self.cmd_args[0], dbus.Byte(self.cmd_args[1]))                else:                    self.dev.SetProperty(self.cmd_args[0], self.cmd_args[1])            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)        elif self.cmd == 'GetProperty':            if len(self.cmd_args) < 1:                print 'Usage: %s -i <dev> GetProperty strPropName' % self.name                sys.exit(1)            try:                print self.dev.GetProperty(self.cmd_args[0])            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)        # Device.Controller methods        elif self.cmd == 'Inquiry':            try:                if len(self.cmd_args) != 2:                    self.ctl.Inquiry()                else:                    length, lap = self.cmd_args                    self.ctl.Inquiry(dbus.Byte(length), dbus.UInt32(long(lap, 0)))            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)            self.listen = True            self.exit_events.append('InquiryComplete')            self.at_interrupt = self.ctl.CancelInquiry        elif self.cmd == 'CancelInquiry':            try:                self.ctl.CancelInquiry()            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)        elif self.cmd == 'RemoteName':            if len(self.cmd_args) < 1:                print 'Bluetooth address needed'                sys.exit(1)            try:                self.ctl.RemoteName(self.cmd_args[0])            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)            self.listen = True            self.exit_events.append('RemoteNameFailed')            self.exit_events.append('RemoteName')        elif self.cmd == 'PeriodicInquiry':            try:                if len(self.cmd_args) < 3:                    length, min, max = (6, 20, 60)                    self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max))                elif len(self.cmd_args) == 3:                    length, min, max = self.cmd_args                    self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max))                else:                    length, min, max, lap = self.cmd_args                    self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max),                            dbus.UInt32(long(lap, 0)))                self.listen = True            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)        elif self.cmd == 'CancelPeriodic':            try:                self.ctl.CancelPeriodic()            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)        elif self.cmd == 'Authenticate':            if len(self.cmd_args) < 1:                print 'Bluetooth address needed'                sys.exit(1)            try:                self.ctl.Authenticate(self.cmd_args[0])            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)            self.listen = True            self.exit_events.append('AuthenticationComplete')        elif self.cmd == 'RoleSwitch':            if len(self.cmd_args) < 2:                print 'Bluetooth address and role needed'                exit.exit(1)            bda, role = self.cmd_args            if not (role == '0' or role == '1'):                print 'Role should be 0 (master) or 1 (slave)'                sys.exit(1)            try:                self.ctl.RoleSwitch(bda, dbus.Byte(role))            except dbus.DBusException, e:                print 'Sending %s failed: %s' % (self.cmd, e)                sys.exit(1)        elif self.cmd == 'Connections':            connections = self.ctl.Connections()            for conn in connections:                print conn        else:            print 'Unknown command: %s' % self.cmd            sys.exit(1)        if self.listen:            signal(SIGINT, self.signal_cb)            signal(SIGTERM, self.signal_cb)            self.main_loop = gobject.MainLoop()            self.main_loop.run()if __name__ == '__main__':    gobject.threads_init()    dbus.glib.init_threads()    tester = Tester(sys.argv)    tester.run()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -