I made a python wrapper function around SoCo 0.9 because the documentation of SoCo is very limited. I hope it helps 🙂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
from soco import SoCo # used SoCo version 0.9 import sys import random import datetime import subprocess import Queue import threading import iot import unicodedata from time import sleep html_escape_table = { "&": "&", '"': """, "'": "'", ">": ">", "<": "<", "+": "+", ",": ",", } def escape(text): return "".join(html_escape_table.get(c,c) for c in text) def remove_accents(input_str): nkfd_form = unicodedata.normalize('NFKD', unicode(input_str)) return u"".join([c for c in nkfd_form if not unicodedata.combining(c)]) class Sonos: def __init__(self, device): self.device = device self.path = 'x-file-cifs://192.168.1.3/music/' ip = '192.168.1.66' self.debug = False self.mysoco = SoCo(ip) self.q_song = Queue.Queue() self.t_song = threading.Thread(target=self.__worker_insert_song, args=(self.q_song, )) self.t_song.daemon = True self.t_song.start() def __iot_send_string(self, event): if self.device <> -1: iot.iot_send_string(self.device, event) def __play_song_and_get_info(self, path_and_song, track_info): duration = 0 transport_info = self.mysoco.get_current_transport_info() if track_info == {}: if transport_info['current_transport_state'] == 'PLAYING': track_info = self.mysoco.get_current_track_info() if self.debug: for x in track_info: print x + track_info[x] else: track_info['empty'] = 1; self.mysoco.play_uri(path_and_song) if 'Bowie' in path_and_song: self.mysoco.seek('00:00:30') if transport_info['current_transport_state'] == 'PLAYING': y = self.mysoco.get_current_track_info() if 'duration' in y: d = y['duration'].split(':') duration = int(d[0])*3600 + int(d[1])*60 + int(d[2]) return(track_info, duration) def __resume_previous_song(self, track_info): if 'mp3radio' in track_info['uri'] or 'aac://' in track_info['uri']: self.mysoco.play_uri(track_info['uri']) elif 'playlist_position' not in track_info: self.mysoco.play_uri(track_info['uri']) else: self.mysoco.play_from_queue(int(track_info['playlist_position'])-1) if 'position' in track_info: self.mysoco.seek(track_info['position']) def __worker_insert_song(self, q_song): track_info = {} duration = 0 while 1: if self.q_song.qsize() > 0: path_and_song = q_song.get() [track_info, duration] = self.__play_song_and_get_info(path_and_song, track_info) t0 = datetime.datetime.now() elif duration <> 0: if (datetime.datetime.now() - t0).total_seconds() > duration: if track_info <> [] and 'empty' not in track_info: self.__resume_previous_song(track_info) track_info = {} duration = 0 sleep(1) def play_song(self, song): self.mysoco.play_uri(self.path + song) self.__iot_send_string(self.path + song) def play_spotify_song(self, song): self.mysoco.play_uri(song) self.__iot_send_string(song) def insert_song(self, song): self.q_song.put(self.path + song) def play_genre(self, genre): l = self.mysoco.get_music_library_information('genres') for i in l: if i.title == genre: self.mysoco.clear_queue() self.mysoco.add_uri_to_queue(i.uri) j = len(self.mysoco.get_queue()) self.mysoco.play_from_queue(int(j*random.random())) self.mysoco.play_mode = 'SHUFFLE' self.__iot_send_string('Genre "'+genre+'" started') sleep(0.1) def play_playlist(self, playlist): l = self.mysoco.get_sonos_playlists() for i in l: if i.title == playlist: self.mysoco.clear_queue() self.mysoco.add_uri_to_queue(i.uri) j = len(self.mysoco.get_queue()) self.mysoco.play_from_queue(int(j*random.random())) self.mysoco.play_mode = 'SHUFFLE' self.__iot_send_string('Playlist "'+playlist+'" started') sleep(0.1) def play_radio(self, radio_station): l = self.mysoco.get_favorite_radio_stations() self.mysoco.clear_queue() j = 0 for i in l['favorites']: self.mysoco.add_uri_to_queue(i['uri']) if i['title'] == radio_station: self.mysoco.play_from_queue(j) self.__iot_send_string('Radio station "'+radio_station+'" started') j += 1 sleep(0.1) def say(self, text): filename = 'sonos_say.wav' if sys.platform == 'win32': subprocess.call(['./espeak','-v','nl', '-w', "w:\\"+filename, text]) else: subprocess.call(["espeak -v nl -w music/"+filename, text]) self.play_song(filename) sleep(0.1) def toggle_play(self): x = self.mysoco.get_current_transport_info() if x['current_transport_state'] == 'PLAYING': self.mysoco.pause() else: self.mysoco.play() sleep(0.1) def change_volume(self, value): self.mysoco.volume += value sleep(0.25) def previous(self): self.mysoco.previous() sleep(0.1) def next(self): self.mysoco.next() sleep(0.1) def get_current_track_info(self): return self.mysoco.get_current_track_info() def get_current_transport_info(self): return self.mysoco.get_current_transport_info() def get_current_song_string(self): song = '' transport_info = self.get_current_transport_info() if transport_info['current_transport_state'] == 'PLAYING': x = self.get_current_track_info() song_uri = x['uri'] if 'mp3radio://' in x['uri'] or 'aac://' in x['uri']: radio = ' "radio station" ' else: radio = '' song = 'Now playing' + radio flag = 0 if len(x['title']) > 0: song = ' "' + x['title'] + '"' flag = 1 if len(x['artist']) > 0: song = song + ' by "' + x['artist'] + '"' if len(x['album']) > 0: song = song + ' from album "' + x['album'] + '"' if flag == 0 and len(x['uri']) > 0: s = song_uri.split('://') song = song + ' "' + s[1] + '"' flag = 1 song = remove_accents(song) return(song) def get_current_song_artist_album(self): song = '' artist = '' album = '' uri = '' transport_info = self.get_current_transport_info() if transport_info['current_transport_state'] == 'PLAYING': x = self.get_current_track_info() if len(x['title']) > 0: song = x['title'] song = remove_accents(song) song = escape(song) if len(x['artist']) > 0: artist = x['artist'] artist = remove_accents(artist) artist = escape(artist) if len(x['album']) > 0: album = x['album'] album = remove_accents(album) album = escape(album) if len(x['uri']) > 0: uri = x['uri'] uri = remove_accents(uri) uri = escape(uri) return(song,artist,album,uri) |
iot is a homebrew logger
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import urllib, urllib2, os, traceback, time def __iot_send(device_id, event = None): if os.name == 'nt': event = urllib.unquote(event).decode('utf8') print "%d: %s" % (device_id, event) else: try: url = 'https://www.picnicprojects.com/casa/log.php?d=' + str(device_id) if event <> None: url += '&v=' + event response = urllib2.urlopen(url) except: pass def iot_send(device_id): __iot_send(device_id) def iot_send_string(device_id, string): event = urllib.quote(string) __iot_send(device_id, event) def iot_send_value(device_id, value): event = '%f'% value __iot_send(device_id, event) def iot_send_values(device_id, values): first = 1 print values for v in values: print v if first == 1: event = '%f' % v first = 0 else: event += ',%f' % v __iot_send(device_id, event) def iot_send_trace(): __iot_send(999, 'exception: ' + traceback.format_exc()) |