⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 avdtp.py

📁 蓝牙通讯协议中A2DVP应用的程序
💻 PY
字号:
"""Partial realization of AVDTP, A2DP and AVRCP profiles*  Copyright (C) 2006  Sergei Krivov <krivov@yahoo.com>**  This program is free software; you can redistribute it and/or modify*  it under the terms of the GNU General Public License as published by*  the Free Software Foundation; either version 2 of the License, or*  (at your option) any later version.**  This program is distributed in the hope that it will be useful,*  but WITHOUT ANY WARRANTY; without even the implied warranty of*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the*  GNU General Public License for more details.**  You should have received a copy of the GNU General Public License*  along with this program; if not, write to the Free Software*  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA."""import ctypestry:  from ctypes import Structure,c_uint8,c_uint16,c_ulong,c_uint32,c_void_pexcept ImportError: # for old versions of ctypes  from ctypes import Structure,c_ubyte,c_ushort,c_ulong,c_uint,c_void_p  c_uint8=c_ubyte  c_unit16=c_ushort  c_unit32=c_unitimport bluetoothimport structimport socket#//Signal idsAVDTP_DISCOVER=1AVDTP_GET_CAPABILITIES=2AVDTP_SET_CONFIGURATION=3AVDTP_GET_CONFIGURATION=4AVDTP_RECONFIGURE=5AVDTP_OPEN =6AVDTP_START =7AVDTP_CLOSE =8AVDTP_SUSPEND =9AVDTP_ABORT =10AVDTP_SECURITY_CONTROL =11MEDIA_TRANSPORT_CATEGORY =1MEDIA_CODEC =7SBC_MEDIA_CODEC_TYPE =0AUDIO_MEDIA_TYPE =0#//Packet typesPACKET_TYPE_SINGLE =0PACKET_TYPE_START =1PACKET_TYPE_CONTINUE =2PACKET_TYPE_END =3#//Message TypesMESSAGE_TYPE_COMMAND =0MESSAGE_TYPE_ACCEPT =2MESSAGE_TYPE_REJECT =3MEDIA_PACKET_HEADER_LENGTH =14MAX_ADDITIONAL_CODEC=4deb=1tran=0class message_header_single(Structure):  _pack_=1  _fields_=[("message_type",c_uint8,2),("packet_type",c_uint8,2)\    ,("transaction_label",c_uint8,4),("signal_id",c_uint8,6),("rfa0",c_uint8,2)]class message_header_start(Structure):  _pack_=1  _fields_=[("message_type",c_uint8,2),("packet_type",c_uint8,2),("nsop",c_uint8,8)\    ,("transaction_label",c_uint8,4),("signal_id",c_uint8,6),("rfa0",c_uint8,2)]class message_header_continue(Structure):  _pack_=1  _fields_=[("message_type",c_uint8,2),("packet_type",c_uint8,2)\    ,("transaction_label",c_uint8,4)]class message_single(Structure):  _pack_=1  _fields_=[("header",message_header_single),("rfa0",c_uint8,2),("acp_seid",c_uint8,6)]class sbc_codec_elements(Structure):  _pack_=1  _fields_=[("channel_mode",c_uint8,4),("frequency",c_uint8,4)\    ,("allocation_method",c_uint8,2),("subbands",c_uint8,2)\    ,("block_length",c_uint8,4),("min_bitpool",c_uint8,8),("max_bitpool",c_uint8,8)]class acp_seid_info(Structure):  _pack_=1  _fields_=[("rfa0",c_uint8,1),("inuse",c_uint8,1),("acp_seid",c_uint8,6)\    ,("rfa1",c_uint8,3),("tsep",c_uint8,1),("media_type",c_uint8,4)]class sepd_resp(Structure):  _pack_=1  _fields_=[("header",message_header_single)]  for i in range(1+MAX_ADDITIONAL_CODEC):_fields_.append(("n%i" %(i),acp_seid_info))class sepd_reject(Structure):  _pack_=1  _fields_=[("header",message_header_single),("error",c_uint8)]class getcap_resp(Structure):  _pack_=1  _fields_=[("header",message_header_single)]  for s in ("serv_cat","serv_cap_len","cap_type","length","media_type"\    ,"media_codec_type"): _fields_.append((s,c_uint8,8))  _fields_.append(("sbc_elements",sbc_codec_elements))  class set_sbc_req(Structure):  _pack_=1  _fields_=[("header",message_header_single),("rfa0",c_uint8,2)\    ,("acp_seid",c_uint8,6),("rfa1",c_uint8,2),("int_seid",c_uint8,6)]  for s in ("serv_cap","serv_cap_len","cap_type","length","media_type"\    ,"media_codec_type"): _fields_.append((s,c_uint8,8))  _fields_.append(("sbc_elements",sbc_codec_elements))  class set_sbc_resp(Structure):  _pack_=1  _fields_=[("header",message_header_single),("serv_cap",c_uint8,8),("acp_seid",c_uint8,8)]class open_strm_resp(Structure):  _pack_=1  _fields_=[("header",message_header_single),("error",c_uint8,8)]class start_strm_resp(Structure):  _pack_=1  _fields_=[("header",message_header_single),("rfa0",c_uint8,2),("acp_seid",c_uint8,6)\    ,("error",c_uint8,8)]def print_fields(st,gap=''):  print gap+str(st.__class__)+':'  for f in st._fields_:    a=st.__getattribute__(f[0])    try:      b=a.__getattribute__("_fields_")      print f[0]+":"      print_fields(a,gap+'  ')    except: print gap+str(f[0]),a    def avdtp_connect(dst,psm=25):  sock=bluetooth.BluetoothSocket( bluetooth.L2CAP )  sock.bind(("",psm))  sock.connect((dst,psm))  if deb: print "connected to ",dst,psm  bluetooth.set_l2cap_mtu(sock,672)  return sockdef avdtp_disconnect(sock):  sock.close()def init_command_single(req):  global tran  req.header.packet_type=PACKET_TYPE_SINGLE  req.header.message_type=MESSAGE_TYPE_COMMAND  req.header.transaction_label=tran  req.header.rfa0=0  tran=(tran+1) & 0xf  return reqdef send_packet(sock,packet):  size=ctypes.sizeof(packet)  if deb>1:    print "sending packet",size,struct.unpack("%iB" %(size),packet)    print_fields(packet)  l=sock.send(packet)  if l!=size: raise IOError('transmission error')    def receive_response(sock,resp_class,resp_error_class=None):  data=sock.recv(1024)  size=ctypes.sizeof(resp_class)  if len(data)>size:    if deb>1:print "warning, possibly wrong responce class ",len(data),size  if len(data)<size:    if deb>1:print "warning, got partial responce, pad with zeros",len(data),size    dl=size-len(data)    l0=[0 for i in range(dl)]    data+=struct.pack("%iB" %(dl),*l0)  resp=ctypes.cast(data,ctypes.POINTER(resp_class))  resp=resp.contents  if deb>1:    print "received",struct.unpack("%iB" %(len(data)),data)    print_fields(resp)  return respdef avdtp_get_capabilities(sock,seid):  cmd=message_single()  cmd=init_command_single(cmd)  cmd.header.signal_id=AVDTP_GET_CAPABILITIES  cmd.acp_seid=seid  send_packet(sock,cmd)  resp=receive_response(sock,getcap_resp)  return respdef avdtp_discover(sock):  cmd=message_single()  cmd=init_command_single(cmd)  cmd.header.signal_id=AVDTP_DISCOVER  send_packet(sock,cmd)  resp=receive_response(sock,sepd_resp)  lseid=[]  for i in range(1+MAX_ADDITIONAL_CODEC):    sep=resp.__getattribute__("n%i" %(i))    seid=sep.acp_seid    if seid:lseid.append(seid) # take non zero seid  if deb:print "got %i seid" %(len(lseid))  lsep=[(avdtp_get_capabilities(sock,seid),seid) for seid in lseid]    return lsepdef avdtp_discover_rsp(sock):  global tran  cmd=receive_response(sock,message_single)  tran=cmd.header.transaction_label  rsp=sepd_resp()  rsp=init_command_single(rsp)  rsp.header.message_type=MESSAGE_TYPE_ACCEPT  rsp.header.signal_id=AVDTP_DISCOVER  sbc=rsp.n0  sbc.acp_seid=1  sbc.inuse=0  sbc.tsep=1  sbc.media_type=SBC_MEDIA_CODEC_TYPE  send_packet(sock,rsp)def set_sbc_configuration(sock,capb_resp,seid,sbc_codec):  cmd=set_sbc_req()  cmd=init_command_single(cmd)  cmd.header.signal_id=AVDTP_SET_CONFIGURATION  cmd.serv_cap=MEDIA_TRANSPORT_CATEGORY  cmd.acp_seid=seid  cmd.int_seid=1  cmd.cap_type=MEDIA_CODEC  cmd.length=6  cmd.media_type=AUDIO_MEDIA_TYPE    cmd.media_codec_type=SBC_MEDIA_CODEC_TYPE  cmd.sbc_elements=capb_resp.sbc_elements  cmd.sbc_elements.allocation_method=2#  values of parameters in sbc routine and here are different  for par,vals in sbc_codec.conf_dict.items():    codec_att=sbc_codec.par.__getattribute__(par)    code=vals[codec_att]    resp_att=capb_resp.sbc_elements.__getattribute__(par)    if resp_att & code:      if deb: print 'setting %s=%i, %i' %(par,code,codec_att)      cmd.sbc_elements.__setattr__(par,code)    else:       if deb: print 'can not set %s=%i, %i' %(par,code,codec_att)  send_packet(sock,cmd)  resp=receive_response(sock,set_sbc_resp)  if resp.header.message_type!=MESSAGE_TYPE_ACCEPT:    raise IOError('Can not set SBC codec parameters')  if deb: print "Successfully set SBC codec parameters"  return seid,sbc_codec    def avdtp_set_configuration(sock,lsep,codecs):  """ select codec from available codecs      and sets codec configuration      just SBC codec is implemented       return seid, codec  """  lsbc=[(resp,seid) for resp,seid in lsep if \    resp.header.message_type!=MESSAGE_TYPE_REJECT and\    resp.media_codec_type==SBC_MEDIA_CODEC_TYPE and\    resp.media_type==AUDIO_MEDIA_TYPE]  if not lsbc:  raise IOError('ACP site dose not have SBC codec')  if len(lsbc)>1 and deb: print 'ACP site has more then one SBC codec, take first' # if ever possible   resp,seid=lsbc[0]  if deb: print "seid=",seid  return set_sbc_configuration(sock,resp,seid,codecs[SBC_MEDIA_CODEC_TYPE])  def avdtp_open(dst,sock,seid):  """open the audio stream"""  cmd=message_single()  cmd=init_command_single(cmd)  cmd.header.signal_id=AVDTP_OPEN  cmd.acp_seid=seid  send_packet(sock,cmd)  resp=receive_response(sock,open_strm_resp)  if resp.error: raise IOError('Can not open stream')  if deb: print "opened stream"  return avdtp_connect(dst,25)def avdtp_start(sock,seid):  cmd=message_single()  cmd=init_command_single(cmd)  cmd.header.signal_id=AVDTP_START  cmd.acp_seid=seid  send_packet(sock,cmd)  resp=receive_response(sock,start_strm_resp)  if resp.error: raise IOError('Can not start stream')  if deb: print "started stream"def avdtp_suspend(sock,seid):  cmd=message_single()  cmd=init_command_single(cmd)  cmd.header.signal_id=AVDTP_SUSPEND  cmd.acp_seid=seid  send_packet(sock,cmd)  resp=receive_response(sock,start_strm_resp)  if resp.error: raise IOError('Can not stop stream')  if deb: print "suspended stream"def avdtp_close(sock,seid):  cmd=message_single()  cmd=init_command_single(cmd)  cmd.header.signal_id=AVDTP_CLOSE  cmd.acp_seid=seid  send_packet(sock,cmd)  resp=receive_response(sock,open_strm_resp)  if resp.error: raise IOError('Can not close stream')  if deb: print "closed steam"      class media_packet_header(Structure):    _pack_=1    _fields_=[("cc",c_uint8,4),("x",c_uint8,1),("p",c_uint8,1),("v",c_uint8,2)\      ,("pt",c_uint8,7),("m",c_uint8,1)\      ,("sequence_number",c_uint16),("time_stamp",c_uint32),("ssrc",c_uint32)]class media_payload_header(Structure):    _pack_=1    _fields_=[("frame_count",c_uint8,4),("rfa01",c_uint8,1),("is_last_fragment",c_uint8,1)\    ,("is_first_fragment",c_uint8,1),("is_fragmented",c_uint8,1)]def media_packet(data,timestamp,frame_count,seq_number):  mtu=672  if len(data)+ctypes.sizeof(media_payload_header)+ctypes.sizeof(media_packet_header)>mtu:    raise ValueError('Media packet size >mtu')  class _media_packet(Structure):    _pack_=1    _fields_=[("media_packet_header",media_packet_header)\      ,("media_payload_header",media_payload_header)\      ,("data",c_uint8*ctypes.sizeof(data))]  packet=_media_packet()  packet.media_packet_header.v=2  packet.media_packet_header.pt=1  packet.media_packet_header.sequence_number=socket.htons(0) #seq_number  packet.media_packet_header.time_stamp=socket.htonl(0)#timestamp  packet.media_packet_header.ssrc=socket.htonl(1)  packet.media_payload_header.frame_count=frame_count  packet.media_payload_header.is_fragmented=0  packet.media_payload_header.rfa=0  packet.data=data  return packet##### AVCTP & AVRCP ################################################################################### Message typesAVCTP_COMMAND_FRAME=0AVCTP_RESPONSE_FRAME=1CMD_PASSTHROUGH=0CMD_ACCEPTED=9PLAY_OP=68  #0x44STOP_OP=69  #0x45PAUSE_OP=70 #0x46NEXT_OP=75  #0x4bPREV_OP=76  #0x4cclass avctp_header(Structure):  _pack_=1  _fields_=[("ipid",c_uint8,1),("cr",c_uint8,1),("packet_type",c_uint8,2),\    ("transaction_label",c_uint8,4),("pid",c_uint16)]class avctp_frame(Structure):  _pack_=1  _fields_=[("header",avctp_header),("ctype",c_uint8,4),("zeros",c_uint8,4),\    ("subunit_id",c_uint8,3),("subunit_type",c_uint8,5),("opcode",c_uint8,8),\    ("operand0",c_uint8,8),("operand1",c_uint8,8)]def avrcp_accept_connection():  sock=bluetooth.BluetoothSocket( bluetooth.L2CAP )  port=23  sock.bind(("",port))  sock.listen(1)  client_sock,address = sock.accept()  if deb: print "Accepted connection from ",address  return client_sockdef avrcp_command(sock,data,callback):  cmd=ctypes.cast(data,ctypes.POINTER(avctp_frame))  cmd=cmd.contents  if cmd.header.packet_type!=PACKET_TYPE_SINGLE: raise ValueError('packet type != PACKET_TYPE_SINGLE')  if cmd.ctype==CMD_PASSTHROUGH: callback(cmd.operand0)  cmd.header.ipid=0 # use the same packet for responce  cmd.header.cr=AVCTP_RESPONSE_FRAME  cmd.header.packet_type=PACKET_TYPE_SINGLE  cmd.ctype=CMD_ACCEPTED  send_packet(sock,cmd)def avrcp_receive_commands(sock,callback):  while True:    data=sock.recv(1024)    if len(data)==0:break    avrcp_command(sock,data,callback)  sock.close()    #####SBC codec, libscb #################################################################################class sbc_struct(Structure):  _fields_=[("flags",c_ulong),("frequency",c_uint32),("channel_mode",c_uint32),("joint",c_uint32)    ,('block_length',c_uint32),('subbands',c_uint32),('bitpool',c_uint32)\    ,('data',c_void_p),('size',c_uint32),('len',c_uint32)\    ,('duration',c_ulong),    ('priv',c_void_p)]class sbc:  def __init__(self,*arg,**kwd):    self.par=sbc_struct()    self.libsbc=ctypes.CDLL('libsbc.so')    err=self.libsbc.sbc_init(ctypes.byref(self.par),c_ulong(1))    if err:   print 'error initializing sbc coder',err    self.configure(*arg,**kwd)  def configure(self,frequency=48000,channels=2,bitpool=32):    self.par.subbands=8    self.par.block_length=16    self.par.bitpool=bitpool    self.par.frequency=frequency    self.par.channel_mode=channels    self.conf_dict={}    self.conf_dict['frequency']={48000:1,44100:2,32000:4,16000:8}    self.conf_dict['subbands']={8:1,4:2}    self.conf_dict['block_length']={16:1,12:2,8:4,4:8}    self.conf_dict['channel_mode']={2:2,1:1}  def encode(self,data):    if len(data)==0:return 0,0,0,0    l=self.libsbc.sbc_encode(ctypes.byref(self.par),data,len(data))    return l,self.par.data,self.par.len,self.par.duration  def fin(self):    self.libsbc.sbc_finish(ctypes.byref(self.par))

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -