Some working architecture for messages
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
## Known issues:
|
||||
- Configure logger with config file
|
||||
- Support multiple streamer
|
||||
- Post process with ffmpeg
|
||||
- Post process with ffmpeg
|
||||
- Avoid using streamer name. Need to use id instead
|
||||
@@ -38,48 +38,52 @@ class TwitchApi:
|
||||
except:
|
||||
return TwitchStreamStatus.ERROR
|
||||
|
||||
def start_chat(self, streamer_name):
|
||||
def start_chat(self, streamer_name, on_message):
|
||||
logger.info("Connecting to %s:%s", TW_CHAT_SERVER, TW_CHAT_PORT)
|
||||
connection = ChatConnection(streamer_name, self.twitch)
|
||||
connection = ChatConnection(streamer_name, self, on_message)
|
||||
|
||||
self.twitch.get_app_token()
|
||||
connection.run()
|
||||
|
||||
def get_user_chat_channel(self, streamer_name):
|
||||
streams = self.twitch.get_streams(user_login=streamer_name)
|
||||
if streams is None or len(streams) < 1:
|
||||
return None
|
||||
return streams["data"][0]["user_login"]
|
||||
|
||||
|
||||
class ChatConnection:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
connection = None
|
||||
|
||||
def __init__(self, streamer_name, twitch):
|
||||
self.twitch = twitch
|
||||
def __init__(self, streamer_name, api, on_message):
|
||||
self.on_message = on_message
|
||||
self.api = api
|
||||
self.streamer_name = streamer_name
|
||||
|
||||
def run(self):
|
||||
# Need to verify channel name.. case sensative
|
||||
streams = self.twitch.get_streams(user_login=self.streamer_name)
|
||||
if streams is None or len(streams) < 1:
|
||||
return
|
||||
# Need to verify channel name.. case sensitive
|
||||
channel = self.api.get_user_chat_channel(self.streamer_name)
|
||||
if not channel:
|
||||
logger.error("Cannot find streamer channel")
|
||||
|
||||
channel = streams["data"][0]["user_login"]
|
||||
self.connect_to_chat(f"#{channel}")
|
||||
|
||||
def connect_to_chat(self, channel):
|
||||
self.connection = socket.socket()
|
||||
self.connection.connect((TW_CHAT_SERVER, TW_CHAT_PORT))
|
||||
self.connection.send(f"PASS sdwrerrwsdawerew\n".encode("utf-8"))
|
||||
# public data to join hat
|
||||
self.connection.send(f"PASS couldBeRandomString\n".encode("utf-8"))
|
||||
self.connection.send(f"NICK justinfan123\n".encode("utf-8"))
|
||||
self.connection.send(f"JOIN #{channel}\n".encode("utf-8"))
|
||||
self.connection.send(f"JOIN {channel}\n".encode("utf-8"))
|
||||
|
||||
while True:
|
||||
resp = self.connection.recv(4096).decode('utf-8')
|
||||
logger.warning(f"Message: {resp}")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def disconnect(self, msg="I'll be back!"):
|
||||
logger.info("Disconnected ")
|
||||
|
||||
def on_welcome(self, c, e):
|
||||
logger.info("Joining channel ")
|
||||
c.join("#" + self.streamer_name)
|
||||
logger.info("Joined????? ")
|
||||
|
||||
def on_pubmsg(self, c, e):
|
||||
logger.info("On message %s <-> %s", c, e)
|
||||
try:
|
||||
while True:
|
||||
msg = self.connection.recv(4096).decode('utf-8')
|
||||
logger.warning(f"Twitch message-> {msg}")
|
||||
if self.on_message:
|
||||
self.on_message(msg)
|
||||
except BaseException as e:
|
||||
logger.error(e)
|
||||
logger.error("Error happened during reading chat")
|
||||
|
||||
@@ -1,20 +1,40 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_msg(msg):
|
||||
"""Breaks a message from an IRC server into its prefix, command, and arguments.
|
||||
"""
|
||||
prefix = ''
|
||||
trailing = []
|
||||
if not msg:
|
||||
raise ValueError("Empty line.")
|
||||
|
||||
if msg[0] == ':':
|
||||
prefix, msg = msg[1:].split(' ', 1)
|
||||
if msg.find(' :') != -1:
|
||||
msg, trailing = msg.split(' :', 1)
|
||||
args = msg.split()
|
||||
args.append(trailing)
|
||||
else:
|
||||
args = msg.split()
|
||||
command = args.pop(0)
|
||||
return prefix, command, args
|
||||
|
||||
|
||||
class TwitchChatRecorder:
|
||||
def __init__(self, api, streamer_name, ignored_prefix=["!"], on_finish=None):
|
||||
self.ignored_prefix = ignored_prefix
|
||||
self.on_finish = on_finish
|
||||
def __init__(self, api, streamer_name, recording_folder):
|
||||
self.recording_folder = recording_folder
|
||||
self.streamer_name = streamer_name
|
||||
self.api = api
|
||||
|
||||
def run(self):
|
||||
self.api.start_chat(self.streamer_name)
|
||||
def run(self, file_template):
|
||||
file_name = os.path.join(self.recording_folder, f"{file_template}.txt", )
|
||||
with open(file_name, "w") as stream:
|
||||
def on_message(twitch_msg):
|
||||
prefix, command, args = parse_msg(twitch_msg)
|
||||
stream.writelines(str(args))
|
||||
|
||||
def on_error(self, error):
|
||||
logger.error(error)
|
||||
|
||||
def on_message(self, msg):
|
||||
logger.info("New message %s", msg)
|
||||
self.api.start_chat(self.streamer_name, on_message)
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
from clipper.api import TwitchApi, TwitchStreamStatus
|
||||
from clipper.chat import TwitchChatRecorder
|
||||
from clipper.video import TwitchVideoRecorder
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RecorderConfig:
|
||||
def __init__(self, tw_client, tw_secret, tw_streamer, tw_quality, output_path):
|
||||
self.output_path = output_path
|
||||
def __init__(self, tw_client, tw_secret, tw_streamer, tw_quality, output_folder):
|
||||
self.output_folder = output_folder
|
||||
self.tw_quality = tw_quality
|
||||
self.tw_streamer = tw_streamer
|
||||
self.tw_secret = tw_secret
|
||||
@@ -18,64 +21,43 @@ class RecorderConfig:
|
||||
|
||||
|
||||
class Recorder:
|
||||
audio_thread = None
|
||||
video_thread = None
|
||||
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.api = TwitchApi(config.tw_client, config.tw_secret)
|
||||
# self.video_recorder = TwitchVideoRecorder(self.api,)
|
||||
self.chat_recorder = TwitchChatRecorder(self.api, config.tw_streamer)
|
||||
self.recording_folder = os.path.join(self.config.output_folder, self.config.tw_streamer)
|
||||
self.video_recorder = TwitchVideoRecorder(self.api, config.tw_streamer, self.recording_folder)
|
||||
self.chat_recorder = TwitchChatRecorder(self.api, config.tw_streamer, self.recording_folder)
|
||||
|
||||
def run(self):
|
||||
if os.path.isdir(self.recording_folder) is False:
|
||||
logger.info("Recording folder `%s` does not exists. Create it", self.recording_folder)
|
||||
os.makedirs(self.recording_folder)
|
||||
|
||||
while True:
|
||||
logger.info("Start watching streamer %s", self.config.tw_streamer)
|
||||
status = self.api.get_user_status(self.config.tw_streamer)
|
||||
if status == TwitchStreamStatus.ONLINE:
|
||||
logger.info("Streamer %s is online. Start recording", self.config.tw_streamer)
|
||||
self.chat_recorder.run()
|
||||
# TODO run video record and join to it.. Run 2 threads?
|
||||
logger.info("Streamer %s finished his stream", self.config.tw_streamer)
|
||||
|
||||
now = datetime.now()
|
||||
file_template = "{0}-{1}".format(self.config.tw_streamer, now.strftime("%H-%M-%S"))
|
||||
self.chat_recorder.run(file_template)
|
||||
# self.video_recorder.run(file_template)
|
||||
|
||||
logger.info("Streamer %s has finished stream", self.config.tw_streamer)
|
||||
time.sleep(15)
|
||||
|
||||
if status == TwitchStreamStatus.OFFLINE:
|
||||
logger.info("Streamer %s is offline. Waiting for it 60 sec", self.config.tw_streamer)
|
||||
time.sleep(60)
|
||||
logger.info("Streamer %s is offline. Waiting for 300 sec", self.config.tw_streamer)
|
||||
time.sleep(300)
|
||||
|
||||
if status == TwitchStreamStatus.ERROR:
|
||||
logger.critical("Error occurred. Exit", self.config.tw_streamer)
|
||||
sys.exit(1)
|
||||
|
||||
# def run(self):
|
||||
# logger.info("Start watching streamer %s", self.config.tw_streamer)
|
||||
#
|
||||
# while True:
|
||||
# status, info = self.api.check_user_status(self.config.tw_streamer)
|
||||
# if status == TwitchStreamStatus.NOT_FOUND:
|
||||
# logger.error("streamer_name not found, invalid streamer_name or typo")
|
||||
# sys.exit(1)
|
||||
# elif status == TwitchStreamStatus.ERROR:
|
||||
# logger.error("%s unexpected error. will try again in 5 minutes",
|
||||
# datetime.datetime.now().strftime("%Hh%Mm%Ss"))
|
||||
# time.sleep(300)
|
||||
# elif status == TwitchStreamStatus.OFFLINE:
|
||||
# logger.info("%s currently offline, checking again in %s seconds", self.streamer_name,
|
||||
# self.refresh_timeout)
|
||||
# time.sleep(self.refresh_timeout)
|
||||
# elif status == TwitchStreamStatus.UNAUTHORIZED:
|
||||
# logger.info("unauthorized, will attempt to log back in immediately")
|
||||
# self.access_token = self.authenticator.refresh_token()
|
||||
# elif status == TwitchStreamStatus.ONLINE:
|
||||
# logger.info("%s online, stream recording in session", self.streamer_name)
|
||||
#
|
||||
# channels = info["data"]
|
||||
# channel = next(iter(channels), None)
|
||||
#
|
||||
# recorded_filename = self.record_stream(channel, recording_path)
|
||||
#
|
||||
# logger.info("recording stream is done")
|
||||
#
|
||||
# if self.on_finish is not None:
|
||||
# self.on_finish(channel, recorded_filename)
|
||||
#
|
||||
# time.sleep(self.refresh_timeout)
|
||||
|
||||
# def start_record(self):
|
||||
# pass
|
||||
if status == TwitchStreamStatus.NOT_FOUND:
|
||||
logger.critical(f"Streamer %s not found, invalid streamer_name or typo", self.config.tw_streamer)
|
||||
sys.exit(1)
|
||||
|
||||
@@ -9,11 +9,11 @@ logger = logging.getLogger(__name__)
|
||||
class TwitchVideoRecorder:
|
||||
access_token = None
|
||||
|
||||
def __init__(self, authenticator, streamer_name, output_path, quality="480p", on_finish=None):
|
||||
def __init__(self, authenticator, streamer_name, recording_folder, quality="480p", on_finish=None):
|
||||
# global configuration
|
||||
self.disable_ffmpeg = False
|
||||
self.refresh_timeout = 15
|
||||
self.output_path = output_path
|
||||
self.recording_folder = recording_folder
|
||||
self.stream_uid = None
|
||||
self.on_finish = on_finish
|
||||
|
||||
@@ -23,13 +23,6 @@ class TwitchVideoRecorder:
|
||||
self.authenticator = authenticator
|
||||
|
||||
def run(self):
|
||||
# path to recorded stream
|
||||
recording_path = os.path.join(self.output_path, "recorded", self.streamer_name)
|
||||
|
||||
# create directory for recordedPath and processedPath if not exist
|
||||
if os.path.isdir(recording_path) is False:
|
||||
os.makedirs(recording_path)
|
||||
|
||||
# make sure the interval to check user availability is not less than 15 seconds
|
||||
if self.refresh_timeout < 15:
|
||||
logger.warning("check interval should not be lower than 15 seconds")
|
||||
|
||||
59
main.py
59
main.py
@@ -12,7 +12,8 @@ def parse_arguments():
|
||||
parser.add_argument('--secret', "-s", help='Twitch secret id', required=True, dest="tw_secret")
|
||||
parser.add_argument('--streamer', "-t", help='Twitch streamer username', required=True, dest="tw_streamer")
|
||||
parser.add_argument('--quality', "-q", help='Video downloading quality', dest="tw_quality", default="360p")
|
||||
parser.add_argument('--output_path', "-o", help='Video download folder', dest="output_path", default=os.getcwd())
|
||||
parser.add_argument('--output_path', "-o", help='Video download folder', dest="output_path",
|
||||
default=os.path.join(os.getcwd(), "recorded"))
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
@@ -27,62 +28,10 @@ def on_chat_recorded(streamer, filename):
|
||||
|
||||
if __name__ == "__main__":
|
||||
# TODO configure logging
|
||||
# TODO rework authentication and status check. recorder should only record
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||
args = parse_arguments()
|
||||
|
||||
config = recorder.RecorderConfig(args.tw_client, args.tw_secret, args.tw_streamer, args.tw_quality, args.output_path)
|
||||
config = recorder.RecorderConfig(args.tw_client, args.tw_secret, args.tw_streamer, args.tw_quality,
|
||||
args.output_path)
|
||||
recorder = recorder.Recorder(config)
|
||||
recorder.run()
|
||||
|
||||
# Twitch downloader
|
||||
# def main(argv):
|
||||
# twitch_recorder = TwitchRecorder()
|
||||
# usage_message = "twitch-recorder.py -u <username> -q <quality>"
|
||||
# logging.basicConfig(filename="twitch-recorder.log", level=logging.INFO)
|
||||
# logging.getLogger().addHandler(logging.StreamHandler())
|
||||
#
|
||||
# try:
|
||||
# opts, args = getopt.getopt(argv, "hu:q:l:",
|
||||
# ["username=", "quality=", "log=", "logging=", "disable-ffmpeg", 'uid='])
|
||||
# except getopt.GetoptError:
|
||||
# print(usage_message)
|
||||
# sys.exit(2)
|
||||
# print(opts)
|
||||
# for opt, arg in opts:
|
||||
# if opt == "-h":
|
||||
# print(usage_message)
|
||||
# sys.exit()
|
||||
# elif opt in ("-u", "--username"):
|
||||
# twitch_recorder.username = arg
|
||||
# elif opt in ("-q", "--quality"):
|
||||
# twitch_recorder.quality = arg
|
||||
# elif opt in ("-l", "--log", "--logging"):
|
||||
# logging_level = getattr(logging, arg.upper(), None)
|
||||
# if not isinstance(logging_level, int):
|
||||
# raise ValueError("invalid log level: %s" % logging_level)
|
||||
# logging.basicConfig(level=logging_level)
|
||||
# logging.info("logging configured to %s", arg.upper())
|
||||
# elif opt in "--uid":
|
||||
# twitch_recorder.stream_uid = arg
|
||||
# elif opt == "--disable-ffmpeg":
|
||||
# twitch_recorder.disable_ffmpeg = True
|
||||
# logging.info("ffmpeg disabled")
|
||||
#
|
||||
# twitch_recorder.run()
|
||||
#
|
||||
#
|
||||
# if __name__ == "__main__":
|
||||
# main(sys.argv[1:])
|
||||
|
||||
# # fix videos from previous recording session
|
||||
# try:
|
||||
# video_list = [f for f in os.listdir(recorded_path) if os.path.isfile(os.path.join(recorded_path, f))]
|
||||
# if len(video_list) > 0:
|
||||
# logging.info("processing previously recorded files")
|
||||
# for f in video_list:
|
||||
# recorded_filename = os.path.join(recorded_path, f)
|
||||
# processed_filename = os.path.join(processed_path, f)
|
||||
# self.process_recorded_file(recorded_filename, processed_filename)
|
||||
# except Exception as e:
|
||||
# logging.error(e)
|
||||
|
||||
Reference in New Issue
Block a user