Switch to threaded model to make the async model easier to work with

Also keeping request objects around and matching them to error
objects to ease debugging, and more debug output when
PUREX_DEBUG env var is set
This commit is contained in:
Vidar Hokstad 2024-01-11 19:22:43 +00:00
parent 0272848944
commit 5f3aca393e
3 changed files with 128 additions and 66 deletions

View file

@ -31,18 +31,24 @@ module X11
authorize(host, family, display_id)
@requestseq = 1
@queue = []
@rqueue = Queue.new # Read but not returned events
@wqueue = Queue.new
@extensions = {} # Known extensions
@atoms = {} # Interned atoms
@extensions = {}
# Interned atoms
@atoms = {}
start_io
end
def event_handler= block
@event_handler= block
end
def flush
while !@wqueue.empty?
sleep(0.01)
end
end
def display_info
@internal
end
@ -68,8 +74,15 @@ module X11
def read_error data
error = Form::Error.from_packet(StringIO.new(data))
# FIXME: Maybe make this configurable, as it means potentially
# keeping along really heavy requests. or alternative purge them
# more aggressively also when there are no errors, as otherwise
# the growth might be unbounded
error.request = @requests[error.sequence_number]
@requests.keys.find_all{|s| s <= error.sequence_number}.each do |s|
@requests.delete(s)
end
STDERR.puts "ERROR: #{error.inspect}"
raise error.inspect
error
end
@ -127,22 +140,24 @@ module X11
end
def read_full_packet(len = 32)
data = @socket.read_nonblock(32)
data = @socket.read(32)
return nil if data.nil?
while data.length < 32
IO.select([@socket],nil,nil,0.001)
data.concat(@socket.read_nonblock(32 - data.length))
end
return data
rescue IO::WaitReadable
return nil
end
def read_packet timeout=5.0
IO.select([@socket],nil,nil, timeout)
def read_packet
data = read_full_packet(32)
return nil if data.nil?
# FIXME: Make it configurable.
@requests.keys.find_all{|s| s <= @requestseq - 50}.each do |s|
@requests.delete(s)
end
# FIXME: What is bit 8 for? Synthentic?
type = data.unpack("C").first & 0x7f
case type
@ -154,57 +169,100 @@ module X11
end
end
def write_raw_packet(pkt)
@requestseq += 1
@socket.write(pkt)
end
def write_packet(*args)
pkt = args.join
pkt[2..3] = u16(pkt.length/4)
write_raw_packet(pkt)
@wqueue << [nil,nil,pkt]
end
def write_request ob
data = ob.to_packet if ob.respond_to?(:to_packet)
data = ob.to_packet(self) if ob.respond_to?(:to_packet)
raise "BAD LENGTH for #{ob.inspect} (#{ob.request_length.to_i*4} ! #{data.size} " if ob.request_length && ob.request_length.to_i*4 != data.size
write_raw_packet(data)
STDERR.puts "write_req: #{ob.inspect}" if @debug
@wqueue << [ob,nil,data]
end
def write_sync(data, reply=nil)
seq = @requestseq
write_request(data)
pkt = next_reply(seq)
return nil if !pkt
return pkt if pkt.is_a?(X11::Form::Error)
pp reply
def write_sync(ob, reply=nil)
data = ob.to_packet(self) if ob.respond_to?(:to_packet)
q = Queue.new
@wqueue << [ob,q,data]
STDERR.puts "write_sync_req: #{ob.inspect}" if @debug
pkt = q.shift
STDERR.puts "write_sync_rep: #{pkt.inspect}" if @debug
raise(pkt) if pkt.is_a?(X11::Form::Error)
return pkt if !pkt.is_a?(String)
reply ? reply.from_packet(StringIO.new(pkt)) : pkt
end
def peek_packet
!@queue.empty?
end
def peek_packet = !@rqueue.empty?
def next_packet = @rqueue.shift
def next_packet
@queue.shift || read_packet
end
def close = @rqueue.close
def next_reply(errseq)
# FIXME: This is totally broken
while pkt = read_packet
if pkt.is_a?(String)
return pkt
elsif pkt.is_a?(X11::Form::Error) && pkt.sequence_number == errseq
return pkt
else
@queue.push(pkt)
def start_io
@replies ||= {}
@requests ||= {}
# Read thread.
# FIXME: Drop the select.
rt = Thread.new do
while pkt = read_packet
#STDERR.puts "read: #{pkt.inspect}"
if !pkt
sleep 0.1
elsif pkt.is_a?(String)
# This is a reply. We need the sequence number.
#
seq = pkt.unpack1("@2S")
STDERR.puts " - seq= #{seq}" if @debug
STDERR.puts @replies.inspect if @debug
if @replies[seq]
q = @replies.delete(seq)
STDERR.puts " - reply to #{q}" if @debug
q << pkt
end
elsif pkt.is_a?(X11::Form::Error)
if @replies[pkt.sequence_number]
q = @replies.delete(pkt.sequence_number)
q << pkt
else
@rqueue << pkt
end
else
@rqueue << pkt
end
end
@rqueue.close
@replies.values.each(&:close)
end
# Write thread
wt = Thread.new do
while msg = @wqueue.shift
ob, q, data = *msg
@requests[@requestseq] = ob
@replies[@requestseq] = q if q
@requestseq = (@requestseq + 1) % 65536
@socket.write(data)
end
end
at_exit do
flush
@rqueue.close
@wqueue.close
# We kill this because it may be stuck in a read
# we'll never care about
Thread.kill(rt)
# We wait for this to finish because otherwise we may
# lose side-effects
wt.join
end
end
def run
loop do
pkt = read_packet
pkt = next_packet
return if !pkt
yield(pkt)
end
@ -215,13 +273,15 @@ module X11
d.depth == depth }.visuals.find{|v| v.qlass = qlass }
end
def default_root = screens.first.root
# Requests
def create_window(x,y,w,h,
values: {},
depth: 32, parent: nil, border_width: 0, wclass: X11::Form::InputOutput, visual: nil
)
wid = new_id
parent ||= screens.first.root
parent ||= default_root
if visual.nil?
visual = find_visual(0, depth).visual_id
@ -571,7 +631,6 @@ module X11
auth_name = ""
auth_data = ""
end
p [auth_name, auth_data]
handshake = Form::ClientHandshake.new(
Protocol::BYTE_ORDER,
@ -581,7 +640,7 @@ module X11
auth_data
)
@socket.write(handshake.to_packet)
@socket.write(handshake.to_packet(self))
data = @socket.read(1)
raise AuthorizationError, "Failed to read response from server" if !data

View file

@ -44,7 +44,7 @@ module X11
end
end
def to_packet
def to_packet(dpy)
# fetch class level instance variable holding defined fields
structs = self.class.structs
@ -64,10 +64,10 @@ module X11
#p [s,value]
if value.is_a?(BaseForm)
v = value.to_packet
v = value.to_packet(dpy)
else
#p [s,value]
v = s.type_klass.pack(value)
v = s.type_klass.pack(value, dpy)
end
#p v
v
@ -77,15 +77,15 @@ module X11
when :length, :format_length
#p [s,value]
#p [value.size]
s.type_klass.pack(value.size)
s.type_klass.pack(value.size, dpy)
when :string
s.type_klass.pack(value)
s.type_klass.pack(value, dpy)
when :list
Array(value).collect do |obj|
if obj.is_a?(BaseForm)
obj.to_packet
obj.to_packet(dpy)
else
s.type_klass.pack(obj)
s.type_klass.pack(obj, dpy)
end
end
end

View file

@ -11,7 +11,7 @@ module X11
def self.config(d,b) = (@directive, @bytesize = d,b)
def self.pack(x)
def self.pack(x, dpy)
if x.is_a?(Symbol)
if (t = X11::Form.const_get(x)) && t.is_a?(Numeric)
x = t
@ -27,7 +27,6 @@ module X11
def self.from_packet(sock) = unpack(sock.read(size))
end
# Primitive Types
class Int8 < BaseType; config("c",1); end
class Int16 < BaseType; config("s",2); end
class Int32 < BaseType; config("l",4); end
@ -36,16 +35,14 @@ module X11
class Uint32 < BaseType; config("L",4); end
class Message
def self.pack(x) = x.b
def self.unpack(x) = x.b
def self.size = 20
def self.pack(x,dpy) = x.b
def self.unpack(x) = x.b
def self.size = 20
def self.from_packet(sock) = sock.read(2).b
end
class String8
def self.pack(x)
x.b + "\x00"*(-x.length & 3)
end
def self.pack(x, dpy) = (x.b + "\x00"*(-x.length & 3))
def self.unpack(socket, size)
raise "Expected size for String8" if size.nil?
@ -57,7 +54,7 @@ module X11
end
class String16
def self.pack(x)
def self.pack(x, dpy)
x.encode("UTF-16BE").b + "\x00\x00"*(-x.length & 1)
end
@ -71,13 +68,13 @@ module X11
class String8Unpadded
def self.pack(x) = x
def self.pack(x,dpy) = x
def self.unpack(socket, size) = socket.read(size)
end
class Bool
def self.pack(x) = (x ? "\x01" : "\x00")
def self.unpack(str) = (str[0] == "\x01")
def self.pack(x, dpy) = (x ? "\x01" : "\x00")
def self.unpack(str) = (str[0] == "\x01")
def self.size = 1
end
@ -96,10 +93,16 @@ module X11
Colormap = Uint32
Drawable = Uint32
Fontable = Uint32
Atom = Uint32
VisualID = Uint32
Mask = Uint32
Timestamp = Uint32
Keysym = Uint32
class Atom
def self.pack(x,dpy) = [dpy.atom(x)].pack("L")
def self.unpack(x) = x.nil? ? nil : x.unpack1("L")
def self.size = 4
def self.from_packet(sock) = unpack(sock.read(size))
end
end
end