| import gradio as gr |
| import gc, copy, re |
| from rwkv.model import RWKV |
| from rwkv.utils import PIPELINE, PIPELINE_ARGS |
| from huggingface_hub import hf_hub_download |
|
|
| ctx_limit = 4096 |
| |
| |
| title = "RWKV-x060-World-1B6-v2.1-20240328-ctx4096.pth" |
| model_path = hf_hub_download(repo_id="BlinkDL/rwkv-6-world", filename=f"{title}") |
| model = RWKV(model=model_path, strategy="cpu bf16") |
| pipeline = PIPELINE(model, "rwkv_vocab_v20230424") |
|
|
|
|
| def generate_prompt(instruction, input=None, history=None): |
| if instruction: |
| instruction = ( |
| instruction.strip() |
| .replace("\r\n", "\n") |
| .replace("\n\n", "\n") |
| .replace("\n\n", "\n") |
| .replace("\n", " ") |
| ) |
| if (history is not None) and len(history) > 1: |
| input = "" |
| for pair in history: |
| if pair[0] is not None and pair[1] is not None and len(pair[1]) > 0: |
| input += f"{pair[0]},{pair[1]}," |
| input = input[:-1] + f". {instruction}" |
| instruction = "Generate a Response to the **last** question below." |
| if input and len(input) > 0: |
| input = ( |
| input.strip() |
| .replace("\r\n", "\n") |
| .replace("\n\n", "\n") |
| .replace("\n\n", "\n") |
| .replace("\n", " ") |
| ) |
| return f"""Instruction: {instruction} |
| |
| Input: {input} |
| |
| Response:""" |
| else: |
| return f"""User: {instruction} |
| |
| Assistant:""" |
|
|
|
|
| examples = [ |
| ["東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。", "", 300, 1.2, 0.5, 0.5, 0.5], |
| [ |
| "Écrivez un programme Python pour miner 1 Bitcoin, avec des commentaires.", |
| "", |
| 333, |
| 1.2, |
| 0.5, |
| 0.5, |
| 0.5, |
| ], |
| ["Write a song about ravens.", "", 300, 1.2, 0.5, 0.5, 0.5], |
| ["Explain the following metaphor: Life is like cats.", "", 300, 1.2, 0.5, 0.5, 0.5], |
| [ |
| "Write a story using the following information", |
| "A man named Alex chops a tree down", |
| 333, |
| 1.2, |
| 0.5, |
| 0.5, |
| 0.5, |
| ], |
| [ |
| "Generate a list of adjectives that describe a person as brave.", |
| "", |
| 333, |
| 1.2, |
| 0.5, |
| 0.5, |
| 0.5, |
| ], |
| [ |
| "You have $100, and your goal is to turn that into as much money as possible with AI and Machine Learning. Please respond with detailed plan.", |
| "", |
| 333, |
| 1.2, |
| 0.5, |
| 0.5, |
| 0.5, |
| ], |
| ] |
|
|
|
|
| def generator( |
| instruction, |
| input=None, |
| token_count=333, |
| temperature=1.0, |
| top_p=0.5, |
| presencePenalty=0.5, |
| countPenalty=0.5, |
| history=None |
| ): |
| args = PIPELINE_ARGS( |
| temperature=max(2.0, float(temperature)), |
| top_p=float(top_p), |
| alpha_frequency=countPenalty, |
| alpha_presence=presencePenalty, |
| token_ban=[], |
| token_stop=[0], |
| ) |
|
|
| instruction = re.sub(r"\n{2,}", "\n", instruction).strip().replace("\r\n", "\n") |
| no_history = (history is None) |
| if no_history: |
| input = re.sub(r"\n{2,}", "\n", input).strip().replace("\r\n", "\n") |
| ctx = generate_prompt(instruction, input, history) |
| print(ctx + "\n") |
|
|
| all_tokens = [] |
| out_last = 0 |
| out_str = "" |
| occurrence = {} |
| state = None |
| for i in range(int(token_count)): |
| out, state = model.forward( |
| pipeline.encode(ctx)[-ctx_limit:] if i == 0 else [token], state |
| ) |
| for n in occurrence: |
| out[n] -= args.alpha_presence + occurrence[n] * args.alpha_frequency |
|
|
| token = pipeline.sample_logits( |
| out, temperature=args.temperature, top_p=args.top_p |
| ) |
| if token in args.token_stop: |
| break |
| all_tokens += [token] |
| for xxx in occurrence: |
| occurrence[xxx] *= 0.996 |
| if token not in occurrence: |
| occurrence[token] = 1 |
| else: |
| occurrence[token] += 1 |
|
|
| tmp = pipeline.decode(all_tokens[out_last:]) |
| if "\ufffd" not in tmp: |
| out_str += tmp |
| if no_history: |
| yield out_str.strip() |
| else: |
| yield tmp |
| out_last = i + 1 |
| if "\n\n" in out_str: |
| break |
|
|
| del out |
| del state |
| gc.collect() |
| if no_history: |
| yield out_str.strip() |
|
|
|
|
| def user(message, chatbot): |
| chatbot = chatbot or [] |
| return "", chatbot + [[message, None]] |
|
|
|
|
| def alternative(chatbot, history): |
| if not chatbot or not history: |
| return chatbot, history |
|
|
| chatbot[-1][1] = None |
| history[0] = copy.deepcopy(history[1]) |
|
|
| return chatbot, history |
|
|
|
|
| with gr.Blocks(title=title) as demo: |
| gr.HTML(f'<div style="text-align: center;">\n<h1>🌍Chat - {title}</h1>\n</div>') |
| gr.Markdown( |
| f"100% RNN RWKV-LM **trained on 12+ natural languages**. Demo limited to ctxlen {ctx_limit}. For best results, <b>write short imperative prompts</b> like commands and requests. Example: use \"Tell me what my name is\" instead of \"What's my name?\"." |
| + "\n\n" |
| + f"Clone this space for faster inference if you can run the app on GPU or better CPU. To use CUDA, replace <code>strategy='cpu bf16'</code> with <code>strategy='cuda fp16'</code> in `app.py`." |
| ) |
| with gr.Tab("Chat mode"): |
| with gr.Row(): |
| with gr.Column(): |
| chatbot = gr.Chatbot() |
| msg = gr.Textbox( |
| scale=4, |
| show_label=False, |
| placeholder="Enter text and press enter", |
| container=False, |
| ) |
| clear = gr.ClearButton([msg, chatbot]) |
| with gr.Column(): |
| token_count_chat = gr.Slider( |
| 10, 512, label="Max Tokens", step=10, value=333 |
| ) |
| temperature_chat = gr.Slider( |
| 0.2, 2.0, label="Temperature", step=0.1, value=1.2 |
| ) |
| top_p_chat = gr.Slider(0.0, 1.0, label="Top P", step=0.05, value=0.3) |
| presence_penalty_chat = gr.Slider( |
| 0.0, 1.0, label="Presence Penalty", step=0.1, value=0 |
| ) |
| count_penalty_chat = gr.Slider( |
| 0.0, 1.0, label="Count Penalty", step=0.1, value=0.7 |
| ) |
|
|
| def clear_chat(): |
| return "", [] |
|
|
| def user_msg(message, history): |
| history = history or [] |
| return "", history + [[message, None]] |
|
|
| def respond(history, token_count, temperature, top_p, presence_penalty, count_penalty): |
| instruction = history[-1][0] |
| history[-1][1] = "" |
| |
| for character in generator( |
| instruction, |
| None, |
| token_count, |
| temperature, |
| top_p, |
| presence_penalty, |
| count_penalty, |
| history |
| ): |
| history[-1][1] += character |
| yield history |
|
|
| msg.submit(user_msg, [msg, chatbot], [msg, chatbot], queue=False).then( |
| respond, [chatbot, token_count_chat, temperature_chat, top_p_chat, presence_penalty_chat, count_penalty_chat], chatbot, api_name="chat" |
| ) |
|
|
| with gr.Tab("Instruct mode"): |
| with gr.Row(): |
| with gr.Column(): |
| instruction = gr.Textbox( |
| lines=2, |
| label="Instruction", |
| value="東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。", |
| ) |
| input_instruct = gr.Textbox( |
| lines=2, label="Input", placeholder="", value="" |
| ) |
| token_count_instruct = gr.Slider( |
| 10, 512, label="Max Tokens", step=10, value=333 |
| ) |
| temperature_instruct = gr.Slider( |
| 0.2, 2.0, label="Temperature", step=0.1, value=1.2 |
| ) |
| top_p_instruct = gr.Slider( |
| 0.0, 1.0, label="Top P", step=0.05, value=0.3 |
| ) |
| presence_penalty_instruct = gr.Slider( |
| 0.0, 1.0, label="Presence Penalty", step=0.1, value=0 |
| ) |
| count_penalty_instruct = gr.Slider( |
| 0.0, 1.0, label="Count Penalty", step=0.1, value=0.7 |
| ) |
| with gr.Column(): |
| with gr.Row(): |
| submit = gr.Button("Submit", variant="primary") |
| clear = gr.Button("Clear", variant="secondary") |
| output = gr.Textbox(label="Output", lines=5) |
| data = gr.Dataset( |
| components=[ |
| instruction, |
| input_instruct, |
| token_count_instruct, |
| temperature_instruct, |
| top_p_instruct, |
| presence_penalty_instruct, |
| count_penalty_instruct, |
| ], |
| samples=examples, |
| label="Example Instructions", |
| headers=[ |
| "Instruction", |
| "Input", |
| "Max Tokens", |
| "Temperature", |
| "Top P", |
| "Presence Penalty", |
| "Count Penalty", |
| ], |
| ) |
| submit.click( |
| generator, |
| [ |
| instruction, |
| input_instruct, |
| token_count_instruct, |
| temperature_instruct, |
| top_p_instruct, |
| presence_penalty_instruct, |
| count_penalty_instruct, |
| ], |
| [output], |
| ) |
| clear.click(lambda: None, [], [output]) |
| data.click( |
| lambda x: x, |
| [data], |
| [ |
| instruction, |
| input_instruct, |
| token_count_instruct, |
| temperature_instruct, |
| top_p_instruct, |
| presence_penalty_instruct, |
| count_penalty_instruct, |
| ], |
| ) |
|
|
|
|
| demo.queue(max_size=10) |
| demo.launch(share=False) |
|
|