First Try at Twitch AI Chat Assistant and Chatbot

July 1, 2023

After looking at what Neuro-sama does, I figured to dust off an old project that I was working on which was a Twitch overlay. On my first try, I wanted to keep the cost down by putting the AI-generated chatbot on my computer but ran into a few issues. But I thought it was a good idea to try to make the 3d model on my overlay more interactive with the AI.

Loading the Model to Unity3d

For my overlay, I was using Unity3d. So I created code to import the Godel model into my project. Below was the code that imports it and places it into my Unity3d project by use of the drop-down menu it created. I used Python Scripting a lot for this project.


using UnityEditor.Scripting.Python;
using UnityEditor;
using System.IO;
using UnityEditor.Scripting.Python;

public class ModelDownloader
{
    [MenuItem("Python/Download Godel Model")]
    static void DownloadModel()
    {
        // Set the directory to save the downloaded model
        string saveDirectory = "G:\\Unity Projects\\Vtuber\\Assets\\ML_Models\\GODELv1_1largeseq2seq";
        Directory.CreateDirectory(saveDirectory);

        // Set the name of the downloaded model file
        string modelName = "pytorch_model.bin";

        // Set the Hugging Face model name to download
        //string hfModelName = "microsoft/GODEL-v1_1-large-seq2seq";
        string hfModelName = "microsoft/GODEL-v1_1-base-seq2seq";
        // Set the code to download and save the model file
        string script = $@"
    from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
    import os

    hf_model_name = ""{hfModelName}""
    save_dir = r""{saveDirectory}""
    model_name = ""{modelName}""

    # Download the model from Hugging Face
    model = AutoModelForSeq2SeqLM.from_pretrained(hf_model_name,)
    tokenizer = AutoTokenizer.from_pretrained(hf_model_name)

    # Save the model and tokenizer to the specified directory
    model.save_pretrained(save_dir)
    tokenizer.save_pretrained(save_dir)
    ";

        // Run the Python script to download and save the model file
        PythonRunner.RunString(script);
    }
}

Test Model

After I imported the code, I had to test if it worked. Below uses the above menu to run the code.


using System.IO;
using Unity.VisualScripting;
using UnityEngine;
using UnityEditor.Scripting.Python;
using static UnityEditor.Rendering.CameraUI;
using UnityEditor;

public class GodelModelRunner
{
    [MenuItem("Python/Check TensorFlow Installation")]
    static void CheckTensorFlowInstallation()
    {
        string script = @"
import tensorflow as tf

# Check available devices
devices = tf.config.list_physical_devices()
cpu_devices = [device for device in devices if 'CPU' in device.device_type]

print(""Available CPU devices:"")
for device in cpu_devices:
    print(device)
";

        PythonRunner.RunString(script);
    }

    [MenuItem("Python/Run Dobel Model")]
    static void RunModel()
    {
        // Set the directory containing the model
        string modelDirectory = "G:\\Unity Projects\\Vtuber\\Assets\\ML_Models\\GODELv1_1largeseq2seq";

        // Set the name of the model file
        string modelName = "pytorch_model.bin";

        // Set the code to run the model
        string script = $@"
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'  # Use CPU only

import torch
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

model_dir = r""{modelDirectory}""
model_name = ""{modelName}""

# Load the model and tokenizer
model = AutoModelForSeq2SeqLM.from_pretrained(model_dir)
tokenizer = AutoTokenizer.from_pretrained(model_dir)

# Preprocess the input text
input_text = ""Hello, how are you doing?""
input_ids = tokenizer.encode(input_text, return_tensors='pt')

# Generate the output
output = model.generate(input_ids)

# Decode and print the output
decoded_output = tokenizer.decode(output[0], skip_special_tokens=True)
print(decoded_output)
";

        // Run the Python script to run the model
        PythonRunner.RunString(script);
    }
}

Run Model In-Game Layout

After I found that model worked I integrated the code into my Twitch Layout. It will keep running pulling new chat messages to run through the Godel AI model then out the string to another code that will process it through Text/Text to Speech code. The code also runs Python outside of Unity3d which is an issue in itself.


using UnityEngine;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;

public class Godelinteraction : MonoBehaviour
{
    private string pythonScriptPath;
    private string outputText;
    public ChatBox chatBox;

    private void Start()
    {
        // Specify the path to the Python script that handles the model and tokenizer
        pythonScriptPath = Application.dataPath + "/Code/AICode/modelLoaded.py";

        // Start the coroutine to accept input and communicate with the Python process
        StartCoroutine(AcceptInputCoroutine());
    }

    private IEnumerator AcceptInputCoroutine()
    {
        while (true)
        {
            // Wait for user input
            yield return new WaitForSeconds(1f); // Adjust as needed

            // User pressed Enter, get input_text
            string inputText = GetRandomInputTextFromChat();
            UnityEngine.Debug.Log("GOT MESSAGE: " + inputText);
            if (!string.IsNullOrEmpty(inputText))
            {
                chatMessages.Clear();
                userMessages.Clear();

                // Execute the Python script and pass the input_text as an argument
                yield return StartCoroutine(ExecutePythonScriptCoroutine(inputText));

                UnityEngine.Debug.Log("Coroutine Ended");

                // Censor the output text if it contains any forbidden words
                string censoredOutputText = CensorshipManager.CensorText(outputText);

                // Log the output text
                UnityEngine.Debug.Log("Output: " + censoredOutputText);

                // Handle the censored output text as needed in Unity
                chatBox.NewText(censoredOutputText);
            }
        }
    }

    private void OnOutputDataReceived(object sender, DataReceivedEventArgs e)
    {
        if (!string.IsNullOrEmpty(e.Data))
        {
            // Capture the output from the Python script
            outputText = e.Data;

            // Log the output text
            UnityEngine.Debug.Log("Output: " + outputText);
            // Handle the generated output text as needed in Unity
        }
    }

    private IEnumerator ExecutePythonScriptCoroutine(string inputText)
    {
        string pythonExecutablePath = @"G:\Unity Projects\Vtuber\Library\PythonInstall\python.exe";

        // Set the PYTHONPATH environment variable
        string pythonEnv = Path.Combine(Application.dataPath, "PythonPackages;" + Application.dataPath + "/Library/PythonInstall/Lib/site-package");

        string pythonDirectory = Path.GetDirectoryName(pythonExecutablePath);

        // Add the input text as an argument to the Python script
        string arguments = $"\"{pythonScriptPath}\" \"{inputText}\"";

        Dictionary<string, string> environmentVariables = new Dictionary<string, string>();
        environmentVariables["PYTHONPATH"] = pythonEnv;

        // Execute the Python script using SpawnProcess
        yield return SpawnProcess(pythonExecutablePath, arguments, environmentVariables, true, true, false, true);
    }

    private IEnumerator SpawnProcess(string programName, string arguments, Dictionary<string, string> environmentVariables, bool redirectOutput, bool redirectError, bool redirectInput, bool createNoWindow)
    {
        ProcessStartInfo startInfo = new ProcessStartInfo(programName, arguments)
        {
            RedirectStandardOutput = redirectOutput,
            RedirectStandardError = redirectError,
            RedirectStandardInput = redirectInput,
            UseShellExecute = false,
            CreateNoWindow = createNoWindow
        };

        if (environmentVariables != null)
        {
            foreach (var kvp in environmentVariables)
            {
                startInfo.EnvironmentVariables[kvp.Key] = kvp.Value;
            }
        }

        Process process = new Process();
        process.StartInfo = startInfo;
        process.OutputDataReceived += OnOutputDataReceived;
        process.ErrorDataReceived += OnErrorDataReceived;
        process.EnableRaisingEvents = true;

        StringBuilder outputBuilder = new StringBuilder();
        StringBuilder errorBuilder = new StringBuilder();

        process.OutputDataReceived += (sender, e) =>
        {
            if (!string.IsNullOrEmpty(e.Data))
            {
                outputBuilder.AppendLine(e.Data);
            }
        };

        process.ErrorDataReceived += (sender, e) =>
        {
            if (!string.IsNullOrEmpty(e.Data))
            {
                errorBuilder.AppendLine(e.Data);
            }
        };

        process.Exited += (sender, e) =>
        {
            // Handle process exit

            // Check if any error occurred
            if (process.ExitCode != 0)
            {
                UnityEngine.Debug.LogError("Python process exited with an error. See the console for details.");
                UnityEngine.Debug.LogError($"Error Output: {errorBuilder.ToString()}");
                return;
            }

            // Process the output data
            string outputData = outputBuilder.ToString();
            UnityEngine.Debug.Log($"Output: {outputData}");

            // Handle the generated output text as needed in Unity
        };

        process.Start();
        process.BeginOutputReadLine();
        process.BeginErrorReadLine();

        while (!process.HasExited)
        {
            yield return null;
        }

        // Cleanup resources
        process.Dispose();
    }

    private void OnErrorDataReceived(object sender, DataReceivedEventArgs e)
    {
        if (!string.IsNullOrEmpty(e.Data))
        {
            // Capture the error message from the Python script
            string errorMessage = e.Data;

            // Log the error message with additional information
            UnityEngine.Debug.LogError($"Python script error: {errorMessage}");
            UnityEngine.Debug.LogError($"Python script path: {pythonScriptPath}");

            // Handle the error as needed in your application
        }
    }

    private List<string> chatMessages = new List<string>();
    private List<string> userMessages = new List<string>();

    public void AddChatMessage(string message)
    {
        chatMessages.Add(message);
    }

    public void AddUserMessage(string message)
    {
        userMessages.Add(message);
    }

    private string GetRandomInputTextFromChat()
    {
        if (userMessages.Count > 0)
        {
            //int randomIndex = UnityEngine.Random.Range(0, userMessages.Count);
            string lastMessage = userMessages[userMessages.Count - 1];
            return lastMessage;
        }
        else if (chatMessages.Count > 0)
        {
            int randomIndex = UnityEngine.Random.Range(0, chatMessages.Count);
            string randomMessage = chatMessages[randomIndex];
            return randomMessage;
        }
        else
        {
            return string.Empty;
        }
    }
}

Python Run Code

Because of how it works, Unity3d needs to run a Python code to call the model. I used the below code to get it done.


import warnings

# Disable the warning
warnings.filterwarnings("ignore", category=UserWarning)

import sys
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'  # Use CPU only

from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

def main(instruction, knowledge, dialog):
    if len(dialog) < 1:
        print("Dialog is missing.")
        return
    
    # Load the model and tokenizer
    model_directory = "G:\\Unity Projects\\Vtuber\\Assets\\ML_Models\\GODELv1_1largeseq2seq"
    model_name = "pytorch_model.bin"
    model = AutoModelForSeq2SeqLM.from_pretrained(model_directory)
    tokenizer = AutoTokenizer.from_pretrained(model_directory)
    
    if knowledge != '':
        knowledge = '[KNOWLEDGE] ' + knowledge
    dialog = ' EOS '.join(dialog)
    query = f"{instruction} [CONTEXT] {dialog} {knowledge}"
    input_ids = tokenizer.encode(query, return_tensors="pt")
    
    # Generate the output
    output = model.generate(input_ids, max_length=128, min_length=8, top_p=0.9, do_sample=True)

    # Decode the output
    decoded_output = tokenizer.decode(output[0], skip_special_tokens=True)

    # Print the decoded output
    print(decoded_output)
    
# Instruction for a chitchat task
instruction = f'Instruction: given a dialog context, you need to respond empathically.'
# Leave the knowledge empty
knowledge = ''
dialog = [sys.argv[1]]

if __name__ == "__main__":
    main(instruction, knowledge, dialog)

Conclusion

Overall the code works but the first major downside is that at least on my computer it takes 1 to 3 minutes for the model to output a response. This is not ideal. I also had issues with the standalone I created as it was not running smoothly. This might be able to fix my editing the code but I view a 1 to 3 minutes window as not acceptable. I also tried to use ONNX model but I was not able to make it able to run in Unity3d or at least the model I found. So in the end, I went with a model that was not local to the PC.

Related Topics

Leave a Reply

Your email address will not be published. Required fields are marked *

Click Below For More Blog Posts
linkedin facebook pinterest youtube rss twitter instagram facebook-blank rss-blank linkedin-blank pinterest youtube twitter instagram