I made a python wrapper function around SoCo 0.9 because the documentation of SoCo is very limited. I hope it helps
from soco import SoCo # used SoCo version 0.9
import sys
import random
import datetime
import subprocess
import Queue
import threading
import iot
import unicodedata
from time import sleep
html_escape_table = {
"&": "&",
'"': """,
"'": "'",
">": ">",
"<": "<",
"+": "+",
",": ",",
}
def escape(text):
return "".join(html_escape_table.get(c,c) for c in text)
def remove_accents(input_str):
nkfd_form = unicodedata.normalize('NFKD', unicode(input_str))
return u"".join([c for c in nkfd_form if not unicodedata.combining(c)])
class Sonos:
def __init__(self, device):
self.device = device
self.path = 'x-file-cifs://192.168.1.3/music/'
ip = '192.168.1.66'
self.debug = False
self.mysoco = SoCo(ip)
self.q_song = Queue.Queue()
self.t_song = threading.Thread(target=self.__worker_insert_song, args=(self.q_song, ))
self.t_song.daemon = True
self.t_song.start()
def __iot_send_string(self, event):
if self.device <> -1:
iot.iot_send_string(self.device, event)
def __play_song_and_get_info(self, path_and_song, track_info):
duration = 0
transport_info = self.mysoco.get_current_transport_info()
if track_info == {}:
if transport_info['current_transport_state'] == 'PLAYING':
track_info = self.mysoco.get_current_track_info()
if self.debug:
for x in track_info:
print x + track_info[x]
else:
track_info['empty'] = 1;
self.mysoco.play_uri(path_and_song)
if 'Bowie' in path_and_song:
self.mysoco.seek('00:00:30')
if transport_info['current_transport_state'] == 'PLAYING':
y = self.mysoco.get_current_track_info()
if 'duration' in y:
d = y['duration'].split(':')
duration = int(d[0])*3600 + int(d[1])*60 + int(d[2])
return(track_info, duration)
def __resume_previous_song(self, track_info):
if 'mp3radio' in track_info['uri'] or 'aac://' in track_info['uri']:
self.mysoco.play_uri(track_info['uri'])
elif 'playlist_position' not in track_info:
self.mysoco.play_uri(track_info['uri'])
else:
self.mysoco.play_from_queue(int(track_info['playlist_position'])-1)
if 'position' in track_info:
self.mysoco.seek(track_info['position'])
def __worker_insert_song(self, q_song):
track_info = {}
duration = 0
while 1:
if self.q_song.qsize() > 0:
path_and_song = q_song.get()
[track_info, duration] = self.__play_song_and_get_info(path_and_song, track_info)
t0 = datetime.datetime.now()
elif duration <> 0:
if (datetime.datetime.now() - t0).total_seconds() > duration:
if track_info <> [] and 'empty' not in track_info:
self.__resume_previous_song(track_info)
track_info = {}
duration = 0
sleep(1)
def play_song(self, song):
self.mysoco.play_uri(self.path + song)
self.__iot_send_string(self.path + song)
def play_spotify_song(self, song):
self.mysoco.play_uri(song)
self.__iot_send_string(song)
def insert_song(self, song):
self.q_song.put(self.path + song)
def play_genre(self, genre):
l = self.mysoco.get_music_library_information('genres')
for i in l:
if i.title == genre:
self.mysoco.clear_queue()
self.mysoco.add_uri_to_queue(i.uri)
j = len(self.mysoco.get_queue())
self.mysoco.play_from_queue(int(j*random.random()))
self.mysoco.play_mode = 'SHUFFLE'
self.__iot_send_string('Genre "'+genre+'" started')
sleep(0.1)
def play_playlist(self, playlist):
l = self.mysoco.get_sonos_playlists()
for i in l:
if i.title == playlist:
self.mysoco.clear_queue()
self.mysoco.add_uri_to_queue(i.uri)
j = len(self.mysoco.get_queue())
self.mysoco.play_from_queue(int(j*random.random()))
self.mysoco.play_mode = 'SHUFFLE'
self.__iot_send_string('Playlist "'+playlist+'" started')
sleep(0.1)
def play_radio(self, radio_station):
l = self.mysoco.get_favorite_radio_stations()
self.mysoco.clear_queue()
j = 0
for i in l['favorites']:
self.mysoco.add_uri_to_queue(i['uri'])
if i['title'] == radio_station:
self.mysoco.play_from_queue(j)
self.__iot_send_string('Radio station "'+radio_station+'" started')
j += 1
sleep(0.1)
def say(self, text):
filename = 'sonos_say.wav'
if sys.platform == 'win32':
subprocess.call(['./espeak','-v','nl', '-w', "w:\\"+filename, text])
else:
subprocess.call(["espeak -v nl -w music/"+filename, text])
self.play_song(filename)
sleep(0.1)
def toggle_play(self):
x = self.mysoco.get_current_transport_info()
if x['current_transport_state'] == 'PLAYING':
self.mysoco.pause()
else:
self.mysoco.play()
sleep(0.1)
def change_volume(self, value):
self.mysoco.volume += value
sleep(0.25)
def previous(self):
self.mysoco.previous()
sleep(0.1)
def next(self):
self.mysoco.next()
sleep(0.1)
def get_current_track_info(self):
return self.mysoco.get_current_track_info()
def get_current_transport_info(self):
return self.mysoco.get_current_transport_info()
def get_current_song_string(self):
song = ''
transport_info = self.get_current_transport_info()
if transport_info['current_transport_state'] == 'PLAYING':
x = self.get_current_track_info()
song_uri = x['uri']
if 'mp3radio://' in x['uri'] or 'aac://' in x['uri']:
radio = ' "radio station" '
else:
radio = ''
song = 'Now playing' + radio
flag = 0
if len(x['title']) > 0:
song = ' "' + x['title'] + '"'
flag = 1
if len(x['artist']) > 0:
song = song + ' by "' + x['artist'] + '"'
if len(x['album']) > 0:
song = song + ' from album "' + x['album'] + '"'
if flag == 0 and len(x['uri']) > 0:
s = song_uri.split('://')
song = song + ' "' + s[1] + '"'
flag = 1
song = remove_accents(song)
return(song)
def get_current_song_artist_album(self):
song = ''
artist = ''
album = ''
uri = ''
transport_info = self.get_current_transport_info()
if transport_info['current_transport_state'] == 'PLAYING':
x = self.get_current_track_info()
if len(x['title']) > 0:
song = x['title']
song = remove_accents(song)
song = escape(song)
if len(x['artist']) > 0:
artist = x['artist']
artist = remove_accents(artist)
artist = escape(artist)
if len(x['album']) > 0:
album = x['album']
album = remove_accents(album)
album = escape(album)
if len(x['uri']) > 0:
uri = x['uri']
uri = remove_accents(uri)
uri = escape(uri)
return(song,artist,album,uri)
iot is a homebrew logger
import urllib, urllib2, os, traceback, time
def __iot_send(device_id, event = None):
if os.name == 'nt':
event = urllib.unquote(event).decode('utf8')
print "%d: %s" % (device_id, event)
else:
try:
url = 'https://www.picnicprojects.com/casa/log.php?d=' + str(device_id)
if event <> None:
url += '&v=' + event
response = urllib2.urlopen(url)
except:
pass
def iot_send(device_id):
__iot_send(device_id)
def iot_send_string(device_id, string):
event = urllib.quote(string)
__iot_send(device_id, event)
def iot_send_value(device_id, value):
event = '%f'% value
__iot_send(device_id, event)
def iot_send_values(device_id, values):
first = 1
print values
for v in values:
print v
if first == 1:
event = '%f' % v
first = 0
else:
event += ',%f' % v
__iot_send(device_id, event)
def iot_send_trace():
__iot_send(999, 'exception: ' + traceback.format_exc())