Hi everyone.
I have a problem with my content download function.
In practice, it seems like it doesn't close the task/thread and I have no idea how to fix it
my log after crash...
1* ...ts/LoaderTasks/UriFetcher.brs(5) m.top.control = "RUN" 2 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(0, m.port) 3 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(0, m.port) 4 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(0, m.port) 5 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(0, m.port) 6 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(0, m.port) 7 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(0, m.port) ....
101 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(0, m.port)
and my code
function init()
m.port = createObject("roMessagePort")
m.top.observeField("request", m.port)
m.top.functionName = "go"
m.top.control = "RUN"
m.ret = true
m.maxNumberXfers = 20
end function
function go() as void
m.jobsById = {}
m.cachedRequest = []
while true
msg = wait(0, m.port)
if msg = invalid
print "No messages received, continuing loop."
continue while
end if
mt = type(msg)
currentTime = getCurrentTime()
for each idKey in m.jobsById
job = m.jobsById[idKey]
if job.startTime <> invalid then
if currentTime - job.startTime > 60000 ' 60 secondi
print "UriFetcher: Timeout for job id: "; idKey
job.xfer.abort() ' Aborta il trasferimento
m.jobsById.delete(idKey) ' Rimuovi il job in timeout
end if
end if
end for
if mt = "roSGNodeEvent"
if msg.getField() = "request"
m.ret = addRequest(msg.getData())
else
print "UriFetcher: unrecognized field '"; msg.getField(); "'"
end if
else if mt = "roUrlEvent"
processResponse(msg)
else
print "UriFetcher: unrecognized event type '"; mt; "'"
end if
if m.ret = false
print "UriFetcher: too many requests"
m.cachedRequest.Push(msg.getData())
print "UriFetcher: a request saved to cache "
end if
printStatus()
end while
end function
function addRequest(request as object) as boolean
if type(request) = "roAssociativeArray"
context = request.context
if type(context) = "roSGNode"
parameters = context.parameters
if type(parameters) = "roAssociativeArray"
uri = parameters.uri
if type(uri) = "roString"
if m.jobsById.Count() < m.maxNumberXfers
newXfer = CreateObject("roUrlTransfer")
newXfer.setUrl(uri)
newXfer.setPort(m.port)
' newXfer.setTimeout(60000)
newXfer.setCertificatesFile("common:/certs/ca-bundle.crt")
newXfer.AddHeader("X-Roku-Reserved-Dev-Id", "")
newXfer.initClientCertificates()
newXfer.addHeader("Content-Type", "application/json")
if m.global.UserInfo <> invalid
print "Authorization" + "Bearer " + m.global.UserInfo.token
newXfer.addHeader("Authorization", "Bearer " + m.global.UserInfo.token)
end if
' should transfer more stuff from parameters to urlXfer
idKey = stri(newXfer.getIdentity()).trim()
if parameters.requestBody <> invalid
postRequest = formatJson(parameters.requestBody)
ok = newXfer.AsyncPostFromString(postRequest)
else
ok = newXfer.AsyncGetToString()
end if
if ok
m.jobsById[idKey] = { context: context, xfer: newXfer }
print "UriFetcher: initiating transfer '"; idkey; "' for URI '"; uri; "'"; " succeeded: "; ok
else
print "UriFetcher: invalid uri: "; uri
end if
else
print "UriFetcher: urlTransferPool is fully used"
return false
end if
end if
end if
end if
end if
return true
end function
function processResponse(msg as object)
idKey = stri(msg.GetSourceIdentity()).trim()
job = m.jobsById[idKey]
if job <> invalid
m.ret = true
context = job.context
parameters = context.parameters
uri = parameters.uri
result = { code: msg.getResponseCode(), content: msg.getString() }
print "UriFetcher: response for transfer job idkey'"; idkey; "' for URI '"; uri; "'"
' next code caould be use for test new api calls,
' just change text to ypur specific end point
' if Instr(1, parameters.uri, "/api/v1/saveprofile") <> 0
' ? result
' end if
' could handle various error codes, retry, etc.
if result.code = 429
headers = msg.GetResponseHeaders()
print "UriFetcher: Server error code: 429"
print "UriFetcher: Server response: Too Many Requests, trying to repeat request after "; headers["retry-after"]; " seconds"
sleep(headers["retry-after"].ToInt() * 1000)
ok = addRequest(job)
if not ok
print "UriFetcher: too many requests in process"
print "UriFetcher: a request saved to cache"
m.cachedRequest.Push(job)
end if
else
job.context.response = result
end if
m.jobsById.delete(idKey)
job.xfer = invalid
job.context = invalid
else
print "UriFetcher: event for unknown job "; idkey
end if
' try to execute a cached job
tryCachedRequests()
if job.xfer <> invalid
job.xfer = invalid ' Invalida l'oggetto roUrlTransfer dopo il completamento
end if
end function
function tryCachedRequests()
if m.cachedRequest.Count() <> 0
sleep(1000) ' Aggiungi un breve ritardo per evitare sovraccarico
request = m.cachedRequest.Shift() ' Prendi la richiesta dalla cache
print "UriFetcher: trying to execute cached request"
ok = addRequest(request)
if not ok
print "UriFetcher: too many requests in process"
m.cachedRequest.Push(request) ' Rimetti la richiesta in cache se non può essere processata
end if
end if
end function
function getCurrentTime() as Integer
dt = CreateObject("roDateTime")
return dt.AsSeconds() ' Restituisce il tempo in millisecondi
end function
sub printStatus()
print "Number of request in cahed queue: "; m.cachedRequest.count()
print "Number of jobs in processing: "; m.jobsById.count()
end sub